Seata Saga 模式()

  本篇文章为你整理了Seata Saga 模式()的详细内容,包含有 Seata Saga 模式,希望能帮助你了解 Seata Saga 模式。

  可以实现服务编排需求,支持单项选择、并发、子流程、参数转换、参数映射、服务执行状态判断、异常捕获等功能

  示例状态图:

  Demo简介

  基于dubbo构建的微服务下,使用Saga模式演示分布式事务的提交和回滚;

  业务流程图如下图所示:

  先下载seata-samples工程:https://github.com/seata/seata-samples.git

  注意SEATA版本需要0.9.0以上

  在dubbo-saga-sample中一个分布式事务内会有2个Saga事务参与者,分别是: InventoryAction 和 BalanceAction ;分布式事务提交则两者均提交,分布式事务回滚则两者均回滚;

  这2个Saga参与者均是 dubbo 服务,两个参与都有一个reduce方法,表示库存扣减或余额扣减,还有一个compensateReduce方法,表示补偿扣减操作。

  InventoryAction 接口定义如下:

  

public interface InventoryAction {

 

   * reduce

   * @param businessKey

   * @param amount

   * @param params

   * @return

   boolean reduce(String businessKey, BigDecimal amount, Map String, Object params);

   * compensateReduce

   * @param businessKey

   * @param params

   * @return

   boolean compensateReduce(String businessKey, Map String, Object params);

  

 

  这个场景用状态语言定义就是下面的json:src/main/resources/statelang/reduce_inventory_and_balance.json

  

{

 

   "Name": "reduceInventoryAndBalance",

   "Comment": "reduce inventory then reduce balance in a transaction",

   "StartState": "ReduceInventory",

   "Version": "0.0.1",

   "States": {

   "ReduceInventory": {

   "Type": "ServiceTask",

   "ServiceName": "inventoryAction",

   "ServiceMethod": "reduce",

   "CompensateState": "CompensateReduceInventory",

   "Next": "ChoiceState",

   "Input": [

   "$.[businessKey]",

   "$.[count]"

   "Output": {

   "reduceInventoryResult": "$.#root"

   "Status": {

   "#root == true": "SU",

   "#root == false": "FA",

   "$Exception{java.lang.Throwable}": "UN"

   "ChoiceState":{

   "Type": "Choice",

   "Choices":[

   "Expression":"[reduceInventoryResult] == true",

   "Next":"ReduceBalance"

   "Default":"Fail"

   "ReduceBalance": {

   "Type": "ServiceTask",

   "ServiceName": "balanceAction",

   "ServiceMethod": "reduce",

   "CompensateState": "CompensateReduceBalance",

   "Input": [

   "$.[businessKey]",

   "$.[amount]",

   "throwException" : "$.[mockReduceBalanceFail]"

   "Output": {

   "compensateReduceBalanceResult": "$.#root"

   "Status": {

   "#root == true": "SU",

   "#root == false": "FA",

   "$Exception{java.lang.Throwable}": "UN"

   "Catch": [

   "Exceptions": [

   "java.lang.Throwable"

   "Next": "CompensationTrigger"

   "Next": "Succeed"

   "CompensateReduceInventory": {

   "Type": "ServiceTask",

   "ServiceName": "inventoryAction",

   "ServiceMethod": "compensateReduce",

   "Input": [

   "$.[businessKey]"

   "CompensateReduceBalance": {

   "Type": "ServiceTask",

   "ServiceName": "balanceAction",

   "ServiceMethod": "compensateReduce",

   "Input": [

   "$.[businessKey]"

   "CompensationTrigger": {

   "Type": "CompensationTrigger",

   "Next": "Fail"

   "Succeed": {

   "Type":"Succeed"

   "Fail": {

   "Type":"Fail",

   "ErrorCode": "PURCHASE_FAILED",

   "Message": "purchase failed"

  

 

  该json表示的状态图:

  状态语言在一定程度上参考了AWS Step Functions

   状态机 属性简介:

  Name: 表示状态机的名称,必须唯一

  Comment: 状态机的描述

  Version: 状态机定义版本

  StartState: 启动时运行的第一个 状态

  States: 状态列表,是一个map结构,key是 状态 的名称,在状态机内必须唯一

  IsRetryPersistModeUpdate: 向前重试时, 日志是否基于上次失败日志进行更新

  IsCompensatePersistModeUpdate: 向后补偿重试时, 日志是否基于上次补偿日志进行更新

   状态 属性简介:

  Type: 状态 的类型,比如有:

  ServiceTask: 执行调用服务任务

  Choice: 单条件选择路由

  CompensationTrigger: 触发补偿流程

  Succeed: 状态机正常结束

  Fail: 状态机异常结束

  SubStateMachine: 调用子状态机

  CompensateSubMachine: 用于补偿一个子状态机

  
CompensateState: 该 状态 的补偿 状态

  Loop: 标识该事务节点是否为循环事务, 即由框架本身根据循环属性的配置, 遍历集合元素对该事务节点进行循环执行

  Input: 调用服务的输入参数列表, 是一个数组, 对应于服务方法的参数列表, $.表示使用表达式从状态机上下文中取参数,表达使用的SpringEL, 如果是常量直接写值即可

  Ouput: 将服务返回的参数赋值到状态机上下文中, 是一个map结构,key为放入到状态机上文时的key(状态机上下文也是一个map),value中$.是表示SpringEL表达式,表示从服务的返回参数中取值,#root表示服务的整个返回参数

  Status: 服务执行状态映射,框架定义了三个状态,SU 成功、FA 失败、UN 未知, 我们需要把服务执行的状态映射成这三个状态,帮助框架判断整个事务的一致性,是一个map结构,key是条件表达式,一般是取服务的返回值或抛出的异常进行判断,默认是SpringEL表达式判断服务返回参数,带$Exception{开头表示判断异常类型。value是当这个条件表达式成立时则将服务执行状态映射成这个值

  Catch: 捕获到异常后的路由

  Next: 服务执行完成后下一个执行的 状态

  Choices: Choice类型的 状态 里, 可选的分支列表, 分支中的Expression为SpringEL表达式, Next为当表达式成立时执行的下一个 状态

  ErrorCode: Fail类型 状态 的错误码

  Message: Fail类型 状态 的错误信息

  更多详细的状态语言解释请看State language referance章节

  更多详细的状态语言使用示例见https://github.com/seata/seata/tree/develop/test/src/test/java/io/seata/saga/engine

  Demo 运行指南

  step 1 启动 SEATA Server

  运行 SeataServerStarter ,启动 Seata Server;

  step 2 启动 dubbo provider Demo

  运行 DubboSagaProviderStarter ,启动 dubbo provider;

  step 3 启动 Saga Demo

  运行 DubboSagaTransactionStarter , 启动 demo工程;

  Demo中的数据库使用的是H2内存数据库, 生产上建议使用与业务相同的库, 目前支持Oracle, Mysql, DB2. 建表语句在 https://github.com/seata/seata/tree/develop/saga/seata-saga-engine-store/src/main/resources/sql

  
所以服务设计时需要允许空补偿, 即没有找到要补偿的业务主键时返回补偿成功并将原业务主键记录下来

  防悬挂控制

  悬挂:补偿服务 比 原服务 先执行

  出现原因:

  原服务 超时(拥堵)

  Saga 事务回滚,触发 回滚

  拥堵的 原服务 到达

  
所以要检查当前业务主键是否已经在空补偿记录下来的业务主键中存在,如果存在则要拒绝服务的执行

  原服务与补偿服务都需要保证幂等性, 由于网络可能超时, 可以设置重试策略,重试发生时要通过幂等控制避免业务数据重复更新

  缺乏隔离性的应对

  由于 Saga 事务不保证隔离性, 在极端情况下可能由于脏写无法完成回滚操作, 比如举一个极端的例子, 分布式事务内先给用户A充值, 然后给用户B扣减余额, 如果在给A用户充值成功, 在事务提交以前, A用户把余额消费掉了, 如果事务发生回滚, 这时则没有办法进行补偿了。这就是缺乏隔离性造成的典型的问题, 实践中一般的应对方法是:

  业务流程设计时遵循“宁可长款, 不可短款”的原则, 长款意思是客户少了钱机构多了钱, 以机构信誉可以给客户退款, 反之则是短款, 少的钱可能追不回来了。所以在业务流程设计上一定是先扣款。

  有些业务场景可以允许让业务最终成功, 在回滚不了的情况下可以继续重试完成后面的流程, 所以状态机引擎除了提供“回滚”能力还需要提供“向前”恢复上下文继续执行的能力, 让业务最终执行成功, 达到最终一致性的目的。

  
配置客户端参数client.rm.report.success.enable=false,可以在当分支事务执行成功时不上报分支状态到server,从而提升性能。

  
当上一个分支事务的状态还没有上报的时候,下一个分支事务已注册,可以认为上一个实际已成功

  API referance

  StateMachineEngine API

  

public interface StateMachineEngine {

 

   * start a state machine instance

   * @param stateMachineName

   * @param tenantId

   * @param startParams

   * @return

   * @throws EngineExecutionException

   StateMachineInstance start(String stateMachineName, String tenantId, Map String, Object startParams) throws EngineExecutionException;

   * start a state machine instance with businessKey

   * @param stateMachineName

   * @param tenantId

   * @param businessKey

   * @param startParams

   * @return

   * @throws EngineExecutionException

   StateMachineInstance startWithBusinessKey(String stateMachineName, String tenantId, String businessKey, Map String, Object startParams) throws EngineExecutionException;

   * start a state machine instance asynchronously

   * @param stateMachineName

   * @param tenantId

   * @param startParams

   * @param callback

   * @return

   * @throws EngineExecutionException

   StateMachineInstance startAsync(String stateMachineName, String tenantId, Map String, Object startParams, AsyncCallback callback) throws EngineExecutionException;

   * start a state machine instance asynchronously with businessKey

   * @param stateMachineName

   * @param tenantId

   * @param businessKey

   * @param startParams

   * @param callback

   * @return

   * @throws EngineExecutionException

   StateMachineInstance startWithBusinessKeyAsync(String stateMachineName, String tenantId, String businessKey, Map String, Object startParams, AsyncCallback callback) throws EngineExecutionException;

   * forward restart a failed state machine instance

   * @param stateMachineInstId

   * @param replaceParams

   * @return

   * @throws ForwardInvalidException

   StateMachineInstance forward(String stateMachineInstId, Map String, Object replaceParams) throws ForwardInvalidException;

   * forward restart a failed state machine instance asynchronously

   * @param stateMachineInstId

   * @param replaceParams

   * @param callback

   * @return

   * @throws ForwardInvalidException

   StateMachineInstance forwardAsync(String stateMachineInstId, Map String, Object replaceParams, AsyncCallback callback) throws ForwardInvalidException;

   * compensate a state machine instance

   * @param stateMachineInstId

   * @param replaceParams

   * @return

   * @throws EngineExecutionException

   StateMachineInstance compensate(String stateMachineInstId, Map String, Object replaceParams) throws EngineExecutionException;

   * compensate a state machine instance asynchronously

   * @param stateMachineInstId

   * @param replaceParams

   * @param callback

   * @return

   * @throws EngineExecutionException

   StateMachineInstance compensateAsync(String stateMachineInstId, Map String, Object replaceParams, AsyncCallback callback) throws EngineExecutionException;

   * skip current failed state instance and forward restart state machine instance

   * @param stateMachineInstId

   * @return

   * @throws EngineExecutionException

   StateMachineInstance skipAndForward(String stateMachineInstId) throws EngineExecutionException;

   * skip current failed state instance and forward restart state machine instance asynchronously

   * @param stateMachineInstId

   * @param callback

   * @return

   * @throws EngineExecutionException

   StateMachineInstance skipAndForwardAsync(String stateMachineInstId, AsyncCallback callback) throws EngineExecutionException;

   * get state machine configurations

   * @return

   StateMachineConfig getStateMachineConfig();

  

 

  StateMachine Execution Instance API:

  

StateLogRepository stateLogRepository = stateMachineEngine.getStateMachineConfig().getStateLogRepository();

 

  StateMachineInstance stateMachineInstance = stateLogRepository.getStateMachineInstanceByBusinessKey(businessKey, tenantId);

   * State Log Repository

   * @author lorne.cl

  public interface StateLogRepository {

   * Get state machine instance

   * @param stateMachineInstanceId

   * @return

   StateMachineInstance getStateMachineInstance(String stateMachineInstanceId);

   * Get state machine instance by businessKey

   * @param businessKey

   * @param tenantId

   * @return

   StateMachineInstance getStateMachineInstanceByBusinessKey(String businessKey, String tenantId);

   * Query the list of state machine instances by parent id

   * @param parentId

   * @return

   List StateMachineInstance queryStateMachineInstanceByParentId(String parentId);

   * Get state instance

   * @param stateInstanceId

   * @param machineInstId

   * @return

   StateInstance getStateInstance(String stateInstanceId, String machineInstId);

   * Get a list of state instances by state machine instance id

   * @param stateMachineInstanceId

   * @return

   List StateInstance queryStateInstanceListByMachineInstanceId(String stateMachineInstanceId);

  

 

  StateMachine Definition API:

  

StateMachineRepository stateMachineRepository = stateMachineEngine.getStateMachineConfig().getStateMachineRepository();

 

  StateMachine stateMachine = stateMachineRepository.getStateMachine(stateMachineName, tenantId);

   * StateMachineRepository

   * @author lorne.cl

  public interface StateMachineRepository {

   * Gets get state machine by id.

   * @param stateMachineId the state machine id

   * @return the get state machine by id

   StateMachine getStateMachineById(String stateMachineId);

   * Gets get state machine.

   * @param stateMachineName the state machine name

   * @param tenantId the tenant id

   * @return the get state machine

   StateMachine getStateMachine(String stateMachineName, String tenantId);

   * Gets get state machine.

   * @param stateMachineName the state machine name

   * @param tenantId the tenant id

   * @param version the version

   * @return the get state machine

   StateMachine getStateMachine(String stateMachineName, String tenantId, String version);

   * Register the state machine to the repository (if the same version already exists, return the existing version)

   * @param stateMachine

   StateMachine registryStateMachine(StateMachine stateMachine);

   * registry by resources

   * @param resources

   * @param tenantId

   void registryByResources(Resource[] resources, String tenantId) throws IOException;

  

 

  Config referance

  在Spring Bean配置文件中配置一个StateMachineEngine

  

 bean id="dataSource" 

 

   bean

   bean id="stateMachineEngine"

   property name="stateMachineConfig" ref="dbStateMachineConfig" /property

   /bean

   bean id="dbStateMachineConfig"

   property name="dataSource" ref="dataSource" /

   property name="resources" value="statelang/*.json" /

   property name="enableAsync" value="true" /

   !-- 事件驱动执行时使用的线程池, 如果所有状态机都同步执行且不存在循环任务可以不需要 --

   property name="threadPoolExecutor" ref="threadExecutor" /

   property name="applicationId" value="saga_sample" /

   property name="txServiceGroup" value="my_test_tx_group" /

   property name="sagaBranchRegisterEnable" value="false" /

   property name="sagaJsonParser" value="fastjson" /

   property name="sagaRetryPersistModeUpdate" value="false" /

   property name="sagaCompensatePersistModeUpdate" value="false" /

   /bean

   bean id="threadExecutor"

   property name="threadNamePrefix" value="SAGA_ASYNC_EXE_" /

   property name="corePoolSize" value="1" /

   property name="maxPoolSize" value="20" /

   /bean

   !-- Seata Server进行事务恢复时需要通过这个Holder拿到stateMachineEngine实例 --

   bean

   property name="stateMachineEngine" ref="stateMachineEngine"/

   /bean

  

 

  State language referance

   状态机 的属性列表

  

{

 

   "Name": "reduceInventoryAndBalance",

   "Comment": "reduce inventory then reduce balance in a transaction",

   "StartState": "ReduceInventory",

   "Version": "0.0.1",

   "States": {

   "IsRetryPersistModeUpdate": false,

   "IsCompensatePersistModeUpdate": false

  

 

  Name: 表示状态机的名称,必须唯一

  Comment: 状态机的描述

  Version: 状态机定义版本

  StartState: 启动时运行的第一个 状态

  States: 状态列表,是一个map结构,key是 状态 的名称,在状态机内必须唯一, value是一个map结构表示 状态 的属性列表

  IsRetryPersistModeUpdate: 向前重试时, 日志是否基于上次失败日志进行更新, 默认是false, 即新增一条重试日志 (优先级高于全局 stateMachineConfig 配置属性)

  IsCompensatePersistModeUpdate: 向后补偿重试时, 日志是否基于上次补偿日志进行更新, 默认是false, 即新增一条补偿日志 (优先级高于全局 stateMachineConfig 配置属性)

  各种 状态 的属性列表

  ServiceTask:

  

"States": {

 

   "ReduceBalance": {

   "Type": "ServiceTask",

   "ServiceName": "balanceAction",

   "ServiceMethod": "reduce",

   "CompensateState": "CompensateReduceBalance",

   "IsForUpdate": true,

   "IsPersist": true,

   "IsAsync": false,

   "IsRetryPersistModeUpdate": false,

   "IsCompensatePersistModeUpdate": false,

   "Loop": {

   "Parallel": 3,

   "Collection": "$.[collection]",

   "ElementVariableName": "element",

   "ElementIndexName": "loopCounter",

   "CompletionCondition": "[nrOfCompletedInstances] / [nrOfInstances] = 0.6"

   "Input": [

   "$.[businessKey]",

   "$.[amount]",

   "loopCounter": "$.[loopCounter]",

   "element": "$.[element]",

   "throwException" : "$.[mockReduceBalanceFail]"

   "Output": {

   "compensateReduceBalanceResult": "$.#root"

   "Status": {

   "#root == true": "SU",

   "#root == false": "FA",

   "$Exception{java.lang.Throwable}": "UN"

   "Retry": [

   "Exceptions": ["io.seata.saga.engine.mock.DemoException"],

   "IntervalSeconds": 1.5,

   "MaxAttempts": 3,

   "BackoffRate": 1.5

   "IntervalSeconds": 1,

   "MaxAttempts": 3,

   "BackoffRate": 1.5

   "Catch": [

   "Exceptions": [

   "java.lang.Throwable"

   "Next": "CompensationTrigger"

   "Next": "Succeed"

  

 

  ServiceName: 服务名称,通常是服务的beanId

  ServiceMethod: 服务方法名称

  CompensateState: 该 状态 的补偿 状态

  IsForUpdate: 标识该服务会更新数据, 默认是false, 如果配置了CompensateState则默认是true, 有补偿服务的服务肯定是数据更新类服务

  IsPersist: 执行日志是否进行存储, 默认是true, 有一些查询类的服务可以配置为false, 执行日志不进行存储提高性能, 因为当异常恢复时可以重复执行

  IsAsync: 异步调用服务, 注意: 因为异步调用服务会忽略服务的返回结果, 所以用户定义的服务执行状态映射(下面的Status属性)将被忽略, 默认为服务调用成功, 如果提交异步调用就失败(比如线程池已满)则为服务执行状态为失败

  IsRetryPersistModeUpdate: 向前重试时, 日志是否基于上次失败日志进行更新, 默认是false, 即新增一条重试日志 (优先级高于状态机属性配置)

  IsCompensatePersistModeUpdate: 向后补偿重试时, 日志是否基于上次补偿日志进行更新, 默认是false, 即新增一条补偿日志 (优先级高于状态机属性配置)

  Loop: 标识该事务节点是否为循环事务, 即由框架本身根据循环属性的配置, 遍历集合元素对该事务节点进行循环执行. 具体使用见: Loop 循环事务使用

  Input: 调用服务的输入参数列表, 是一个数组, 对应于服务方法的参数列表, $.表示使用表达式从状态机上下文中取参数,表达使用的SpringEL, 如果是常量直接写值即可。复杂的参数如何传入见:复杂参数的Input定义

  Output: 将服务返回的参数赋值到状态机上下文中, 是一个map结构,key为放入到状态机上文时的key(状态机上下文也是一个map),value中$.是表示SpringEL表达式,表示从服务的返回参数中取值,#root表示服务的整个返回参数

  Status: 服务执行状态映射,框架定义了三个状态,SU 成功、FA 失败、UN 未知, 我们需要把服务执行的状态映射成这三个状态,帮助框架判断整个事务的一致性,是一个map结构,key是条件表达式,一般是取服务的返回值或抛出的异常进行判断,默认是SpringEL表达式判断服务返回参数,带$Exception{开头表示判断异常类型。value是当这个条件表达式成立时则将服务执行状态映射成这个值

  Catch: 捕获到异常后的路由

  Retry: 捕获异常后的重试策略, 是个数组可以配置多个规则, Exceptions 为匹配的的异常列表, IntervalSeconds 为重试间隔, MaxAttempts 为最大重试次数, BackoffRate 下一次重试间隔相对于上一次重试间隔的倍数,比如说上次一重试间隔是2秒, BackoffRate=1.5 则下一次重试间隔是3秒。Exceptions 属性可以不配置, 不配置时表示框架自动匹配网络超时异常。当在重试过程中发生了别的异常,框架会重新匹配规则,并按新规则进行重试,同一种规则的总重试次数不会超过该规则的MaxAttempts

  Next: 服务执行完成后下一个执行的 状态

  
如果有异常, 则判断异常是不是网路连接超时, 如果是则认为是FA

  如果是其它异常, 服务IsForUpdate=true则状态为UN, 否则为FA

  
整个状态机的执行状态如何判断?是由框架自己判断的, 状态机有两个状态: status(正向执行状态), compensateStatus(补偿状态):

  如果所有服务执行成功(事务提交成功)则status=SU, compensateStatus=null

  如果有服务执行失败且存在更新类服务执行成功且没有进行补偿(事务提交失败) 则status=UN, compensateStatus=null

  如果有服务执行失败且不存在更新类服务执行成功且没有进行补偿(事务提交失败) 则status=FA, compensateStatus=null

  如果补偿成功(事务回滚成功)则status=FA/UN, compensateStatus=SU

  发生补偿且有未补偿成功的服务(回滚失败)则status=FA/UN, compensateStatus=UN

  存在事务提交或回滚失败的情况Seata Sever都会不断发起重试

  
Choice类型的 状态 是单项选择路由

  Choices: 可选的分支列表, 只会选择第一个条件成立的分支

  Expression: SpringEL表达式

  Next: 当Expression表达式成立时执行的下一个 状态

  Succeed:

  

"Succeed": {

 

   "Type":"Succeed"

  

 

  运行到 Succeed状态 表示状态机正常结束, 正常结束不代表成功结束, 是否成功要看每个 状态 是否都成功

  Fail:

  

"Fail": {

 

   "Type":"Fail",

   "ErrorCode": "PURCHASE_FAILED",

   "Message": "purchase failed"

  

 

  运行到 Fail状态 状态机异常结束, 异常结束时可以配置ErrorCode和Message, 表示错误码和错误信息, 可以用于给调用方返回错误码和消息

  CompensationTrigger:

  

"CompensationTrigger": {

 

   "Type": "CompensationTrigger",

   "Next": "Fail"

  

 

  CompensationTrigger类型的state是用于触发补偿事件, 回滚分布式事务

  Next: 补偿成功后路由到的state

  SubStateMachine:

  

"CallSubStateMachine": {

 

   "Type": "SubStateMachine",

   "StateMachineName": "simpleCompensationStateMachine",

   "CompensateState": "CompensateSubMachine",

   "IsRetryPersistModeUpdate": false,

   "IsCompensatePersistModeUpdate": false,

   "Input": [

   "a": "$.1",

   "barThrowException": "$.[barThrowException]",

   "fooThrowException": "$.[fooThrowException]",

   "compensateFooThrowException": "$.[compensateFooThrowException]"

   "Output": {

   "fooResult": "$.#root"

   "Next": "Succeed"

  

 

  SubStateMachine类型的 状态 是调用子状态机

  StateMachineName: 要调用的子状态机名称

  CompensateState: 子状态机的补偿state, 可以不配置, 系统会自动创建它的补偿state, 子状态机的补偿实际就是调用子状态机的compensate方法, 所以用户并不需要自己实现一个对子状态机的补偿服务。当配置这个属性时, 可以里利用Input属性自定义传入一些变量, 见下面的CompensateSubMachine

  CompensateSubMachine:

  

"CompensateSubMachine": {

 

   "Type": "CompensateSubMachine",

   "Input": [

   "compensateFooThrowException": "$.[compensateFooThrowException]"

  

 

  CompensateSubMachine类型的state是专门用于补偿一个子状态机的state,它会调用子状态机的compensate方法,可以利用Input属性传入一些自定义的变量, Status属性自定判断补偿是否成功

  复杂参数的Input定义

  

"FirstState": {

 

   "Type": "ServiceTask",

   "ServiceName": "demoService",

   "ServiceMethod": "complexParameterMethod",

   "Next": "ChoiceState",

   "ParameterTypes" : ["java.lang.String", "int", "io.seata.saga.engine.mock.DemoService$People", "[Lio.seata.saga.engine.mock.DemoService$People;", "java.util.List", "java.util.Map"],

   "Input": [

   "$.[people].name",

   "$.[people].age",

   "name": "$.[people].name",

   "age": "$.[people].age",

   "childrenArray": [

   "name": "$.[people].name",

   "age": "$.[people].age"

   "name": "$.[people].name",

   "age": "$.[people].age"

   "childrenList": [

   "name": "$.[people].name",

   "age": "$.[people].age"

   "name": "$.[people].name",

   "age": "$.[people].age"

   "childrenMap": {

   "lilei": {

   "name": "$.[people].name",

   "age": "$.[people].age"

   "name": "$.[people].name",

   "age": "$.[people].age"

   "name": "$.[people].name",

   "age": "$.[people].age"

   "@type": "io.seata.saga.engine.mock.DemoService$People",

   "name": "$.[people].name",

   "age": "$.[people].age"

   "lilei": {

   "@type": "io.seata.saga.engine.mock.DemoService$People",

   "name": "$.[people].name",

   "age": "$.[people].age"

   "Output": {

   "complexParameterMethodResult": "$.#root"

  

 

  上面的complexParameterMethod方法定义如下:

  

People complexParameterMethod(String name, int age, People people, People[] peopleArrya, List People peopleList, Map String, People peopleMap)

 

  class People {

   private String name;

   private int age;

   private People[] childrenArray;

   private List People childrenList;

   private Map String, People childrenMap;

  

 

  启动状态机时传入参数:

  

Map String, Object paramMap = new HashMap (1);

 

  People people = new People();

  people.setName("lilei");

  people.setAge(18);

  paramMap.put("people", people);

  String stateMachineName = "simpleStateMachineWithComplexParams";

  StateMachineInstance inst = stateMachineEngine.start(stateMachineName, null, paramMap);

  

 

  注意ParameterTypes属性是可以不用传的,调用的方法的参数列表中有Map, List这种可以带泛型的集合类型, 因为java编译会丢失泛型, 所以需要用这个属性, 同时在Input的json中对应的对这个json加 @type 来申明泛型(集合的元素类型)

  Loop 循环事务使用

  

"States": {

 

   "ReduceBalance": {

   "Type": "ServiceTask",

   "ServiceName": "balanceAction",

   "ServiceMethod": "reduce",

   "CompensateState": "CompensateReduceBalance",

   "Loop": {

   "Parallel": 3,

   "Collection": "$.[collection]",

   "ElementVariableName": "loopElement",

   "ElementIndexName": "loopCounter",

   "CompletionCondition": "[nrOfCompletedInstances] / [nrOfInstances] = 0.6"

   "Input": [

   "loopCounter": "$.[loopCounter]",

   "element": "$.[element]",

   "throwException": "$.[fooThrowException]"

   "Output": {

   "fooResult": "$.#root"

   "Status": {

   "#root == true": "SU",

   "#root == false": "FA",

   "$Exception{java.lang.Throwable}": "UN"

   "Next": "ChoiceState"

   "ChoiceState": {

   "Type": "Choice",

   "Choices": [

   "Expression": "[loopResult].?[#this[fooResult] == null].size() == 0",

   "Next": "SecondState"

   "Default":"Fail"

  

 

  Loop: Loop 属性配置

  Parallel: 并发执行事务的线程数, 支持并发执行循环任务, 默认: 1

  Collection: 集合变量名, 在状态机启动时的入参, 用于框架获取需要循环遍历的集合对象

  ElementVariableName: 集合单个元素名称, 用于在分支事务中获取元素值, 默认: loopElement

  CompletionCondition: 自定义循环结束条件, 不写默认全部执行完成, 即: [nrOfInstances] == [nrOfCompletedInstances]

  ElementIndexName: 集合下标名称, 用于在分支事务中获取元素下标, 默认: loopCounter

  
在循环任务中, 其每次事务出参会存放于一个 List: loopResult中, 在事务上下文中可以通过 loopResult获取事务的运行结果集, 并遍历获取单次执行结果

  Loop 上下文参数

  nrOfInstances: 循环实例总数

  nrOfActiveInstances: 当前活动的实例总数

  nrOfCompletedInstances: 当前完成的实例总数

  loopResult: 循环实例执行的结果集

  
示例状态图:

  问: saga服务流程可以不配置吗,使用全局事务id串起来,这样省去配置的工作量,再加上人工配置难免会配置错误?

  答: saga一般有两种实现,一种是基于状态机定义,比如apache camel saga、eventuate,一种是基于注解+拦截器实现,比如serviceComb saga,后者是不需要配置状态图的。由于 Saga 事务不保证隔离性, 在极端情况下可能由于脏写无法完成回滚操作, 比如举一个极端的例子, 分布式事务内先给用户A充值, 然后给用户B扣减余额, 如果在给A用户充值成功, 在事务提交以前, A用户把余额消费掉了, 如果事务发生回滚, 这时则没有办法进行补偿了,有些业务场景可以允许让业务最终成功, 在回滚不了的情况下可以继续重试完成后面的流程, 基于状态机引擎除可以提供“回滚”能力外, 还可以提供“向前”恢复上下文继续执行的能力, 让业务最终执行成功, 达到最终一致性的目的,所以在实际生产中基于状态机的实现应用更多。后续也会提供基于注解+拦截器实现。

  问: 比如有 服务A在系统1里面,服务B在系统2里面。全局事务由A开启,流程调用B开启子事务,那系统2也需要维护Saga状态机的三个表吗,也需要在Spring Bean配置文件中配置一个StateMachineEngine吗?

  答: 不需要, 只在发起方记录日志. 由于只在发起方记录日志同时对参与者服务没有接口参数的要求,使得Saga可以方便集成其它机构或遗留系统的服务

  问: 如果 系统1和系统2里面的服务,可以相互调用。系统12都可以开启全局事务,可以这样使用吗。那1和2 都需要维护Saga状态机的三个表,也需要在Spring Bean配置文件中配置一个StateMachineEngine?

  答: 可以这样使用,如果两个系统都开启saga事务,那就要记录那三个表配置StateMachineEngine

  问: 使用 Seata 的时候,现在是 AT 模式 如果改成 saga 模式的话,改造会大吗?

  答: AT 模式完全是透明的,Saga 是有侵入性的,要配置状态机 json,如果服务多改造会比较大

  问: Saga 模式是不是基于 AT 来加强的长事务处理呢?

  答: 没有基于 AT,客户端完全是两套,Server 端是复用的。你也可以看 Saga 的单元测试,那里有很多示例:https://github.com/seata/seata/tree/develop/test/src/test/java/io/seata/saga/engine

  问: 开发者文档中状态机引擎原理图里的EventQueue只是开启分布式事务的系统来进行事件驱动,调用其它系统服务像调用本地一样。系统之间还是RPC调用是吧。而不是系统之前也是纯事件驱动的?( 系统之间也是纯事件驱动的 指 RPC 也是非阻塞的)

  答: 节点与节点间是事件驱动的, RPC 的非阻塞需要 rpc client 支持,理论上也是可以的。rpc client 如果也是非阻塞 IO,那么所有环节都是异步了。

  问: 考虑一个业务流程, 它后续的子流程, 不管谁先运行都不会相互影响,可以异步调用。子流程是其它系统服务。Seata Saga 是不是实现了这点,其实我没看明白 ,Seata Saga异步调用具体是不是各个节点异步了?

  答: Saga的异步启动一个状态机(stateMachineEngine.startAsync)是指状态机内所有的状态都是事件驱动来执行的, 整个流程实际是同步的, 上一个状态结束才产生下一个状态的事件. 而异步调用一个服务是配置该ServiceTask为 IsAsync :true, 这个服务将会异步调用, 不会阻塞状态机的推进, 状态机不关心它的执行结果.

  问: Saga源码中事件驱动层同步bus和异步bus是什么作用?

  答: 同步BUS是线程阻塞的,等整个状态机执行完毕才返回,异步BUS是非线程阻塞的,调用后立即返回,状态机执行完毕后回调你的Callback。

  问: IsPersist: 执行日志是否进行存储,默认是true,有一些查询类的服务可以配置在false,执行日志不进行存储提高性能,因为当异常恢复时可以重复执行?

  答: 是的可以配置成false, 不过建议先保持默认,这样在查询执行日志比较全,真的要做性能调优再配,一般不会有性能问题

  问: seata saga 开启事务的客户端或者seata server服务端宕机或者重启,未完成的状态机实例是怎么保证继续执行下去的?谁去触发这个操作?

  答: 状态机实例在本地数据库有记录日志,通过日志恢复。seata server 会触触发事务恢复。

  问: saga 的json文件 支持热部署吗?

  答: 支持, stateMachineEngine.getStateMachineConfig().getStateMachineRepository().registryByResources()。不过java代码和服务需要自己实现支持热部署

  问: 出参入参都放在saga的上下文中,如果参数内容较多较大,业务量又大的话,对内存有限制吗?

  答: 没有做限制,建议无关的参数不要放到上下文。下一个服务需要用的参数、或用于分支判断的参数可以放入上下文。

  问: 确认个事情:每个节点,要么自己方法内部 Catch 异常处理,使最终有返回信息。要么自己内部可以不处理,交由状态机引擎捕获异常,在 json 中定义 Catch 属性。 而不是补偿节点能够自动触发补偿,需要补偿必须手动在 json,由 Catch 或者 Choices 属性路由到 CompensationTrigger?

  答: 对的,这个是为了提高灵活性。用户可以自己控制是否进行回滚,因为并不是所有异常都要回滚,可能有一些自定义处理手段。

  问: 所以 Catch 和 Choices 可以随便路由到想要的 state 对吧?

  答: 是的。这种自定义出发补偿的设计是参考了 bpmn2.0 的。

  问: 还有关于 json 文件,我打算一条流程,就定义一个 json,虽然有的流程很像,用 Choices,可以解决。但是感觉 json 还是要尽量简单。这样考虑对吗?

  答: 你可以考虑用子状态机来复用,子状态机会多生成一行 stateMachineInstance 记录,但对性能影响应该不大。

  website powered by docsite

  愿景Seata 是一款阿里巴巴开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。

  文档Seata 是什么?快速开始报告文档问题在Github上编辑此文档资源博客社区中文版英文版Copyright © 2022 Seata

  以上就是Seata Saga 模式()的详细内容,想要了解更多 Seata Saga 模式的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: