第08课:分布式事务实践

前言

在分布式计算领域中,一个重要的场景就是共识问题。前文也简单提及过,共识即是让分布式系统中所有节点对于一些值达成共识。共识问题的处理一般用于 Leader 选举(比如单主复制的集群,只有主节点接受数据写入,如果这个 Leader 节点挂了,需要其他节点能选举出新的 Leader,使用选举算法需保证选举出一个新 Leader)以及原子提交(跟事务 ACID 的 A 概念是一样的,分布式的原子提交需要保证所有节点都最终获得一样的数据结果,事件的操作都执行成功或者都放弃)。我们常说的分布式事务一般是指解决分布式系统的原子提交(Atomic Commit)。而服务发现以及数据存储集群的数据复制的 Master 选举,一般是基于某种 Leader 选举的算法。

本文会结合前面几篇的内容,包括 第 04 课:数据复制,第 06 课:分布式存储系统的事务及第 07 课:分布式一致性,来介绍关于分布式事务的实践。关于共识算法的实现以及在微服务中的服务发现方面的应用,会在第09课中详细介绍。

分布式事务理论

分布式事务的问题

第06课:分布式存储系统的事务中介绍了数据库系统的事务特性。其中原子性定义了事务中的操作或者都执行成功或者都执行失败,并且未执行完成的,中间态数据需要对其他事务不可见。如果在单个数据库实例中,原子性可以很好的得到保证。但在分布式环境下,原子性很容易被破坏,比如一些节点在处理事务中 Crash 了(数据还没完全 Commit,并且 Recovery 之后进行了回滚),但是其余的节点已经提交成功;或者网络故障导致一部分请求数据没有到达某些节点,而另一部分的节点接收到了请求却处理成功了。

原子提交在很多场景下都是一种基础要求,如果没有原子性的保证,很多时候会导致分布式系统数据的不一致。假设一部分节点看到的数据是 A=1,然而因为另外一部分节点执行失败了,失败的节点的数据依然是 A=0。而看到 A=1 的客户端如果继续根据 A=1 进行其他数据变更,比如程序在判断 A>0 时变更了 b,将 b 设置成 100,则随着时间的推移会产生更多不一致的数据。

在分布式环境下,有各种场景会导致分布式事务很难维持原子性的语义。但在实践中,也是有一些方法可以获取原子性的保证的。我们通过下文来看一些实现方式。

两阶段提交(2PC Two-Phase Commit)

2PC 是一种可以支持分布式集群的保证原子性的协议。实现 2PC,需要有一个重要的组件叫协调者(Coordinator),有的实现中也叫事务管理器(Transaction Manager)。目前 Coordinator 的实现可以是一个单独的 lib 的形式绑定在应用进程里(比如 JavaEE 容器),或者是一个单独的进程或服务,如 JOTM (JavaTM Open Transaction Manager),BTM(Bitronix JTA Transaction Manager)。其他的数据存储节点,也即运行着 DB 实例节点也叫参与者(Participants)。

2PC 中的两阶段主要是指:

  • 投票(准备)阶段:协调者发送一个“prepare”请求给所有的参与者,询问是否可以提交。参与者判断事务是否会产生一些冲突或者是否有资源执行该事务,如果参与者判断可以提交,则会将事务提交到本地,并回复“yes”,否则回复“no”。
  • 提交(执行)阶段:如果所有参与者都回复“yes”,则协调者发送“commit”请求给所有参与者,则所有参与者的 commit 执行生效。如果准备阶段有一个参与者回复“no”,则协调者发送“abort”请求给所有参与者,所有的参与者对本地事务进行回滚。有的系统实现中,也会在参与者第二阶段执行完毕后发送一个“Commit ACK”或“Abort ACK”消息,让协调者更好地判断所有参与者的执行结果。

如下图示意了 2PC 的过程,可以看到两个阶段的执行过程。

1-35

2PC 看上去简单,但为了保证复杂的分布式环境下的原子性,在实现上还有许多要处理的细节:

  • 为了保证事务的原子性以及可回滚,需要存储引擎(DB)基于预写日志(WAL,Write-Ahead Log,在第03课:数据复制中有介绍)。
  • 每个分布式事务在开始时,应用系统需要向协调者请求一个全局唯一的事务 ID(Global unique Transaction ID)。
  • 为了保证每个参与者的一致性,协调者给所有参与者发送“prepare”请求时带上全局事务 ID,所以有任一个节点请求失败或者超时,则协调者会发送“abort”(带上全局事务 ID)请求给所有参与者。
  • 当协调者收到“prepare”请求时,需要确认其可以执行这个事务(检查约束、冲突、资源情况等),并且参与者在 prepare 阶段写入 undo log 或者 redo log,以及将数据写入到磁盘(写入的数据作为本地事务,只有事务内是可见的),然后参与者回复“yes”给协调者,这个回复就代表着这个参与者承诺了一定可以 commit 这个事务。
  • 协调者收到所有的参与者的回复之后,决定是否要提交(只有在所有参与者都回复“yes”的情况才可以提交,如果有参与者超时没有回复,则协调者默认其返回的是“NO”,并执行 Abort)或者放弃(Abort)这个事务。然后,协调者将这个决定写入到本地的一个事务 Log 中,以防止在第二阶段开始前协调者自己 Crash。
  • 写入到本地事务 Log 之后,协调者向所有参与者发送“commit”或者“abort”请求。如果其中某个请求失败或者超时,则协调者必须永久的不断地重试,直到成功被参与者处理。如果是因为某个参与者自身 Crash 了,那也会等待其恢复后提交事务。所以这就是第一阶段,所有参与者的“yes”回复的重要性,这个“yes”是一个承诺,承诺了一定要 Commit。
    2PC 可以保证分布式事务的原子提交,其最重要的一点就是第一阶段,所有参与者的回复“yes”,所有参与者在第一阶段回复“yes”之后将再也没有反悔的机会。以上只介绍了参与者 Crash 或者请求超时的情况,下面来看看协调者 Crash 的情况:如果协调者在第一阶段 Crash 了,那么参与者的事务并没有真正 Commit,数据的原子性没有被破坏;如果在第二阶段,发送 Commit 请求时,假设发送给 DB1 的请求成功并且被 Commit 了,但是在发送请求给 DB2 时忽然协调者 Crash 了。这种情况只能等协调者恢复,并且需要根据协调者本地存储的事务 Log 进行恢复。所以协调者在发送所有事务请求之前都需要往本地事务 Log 写入,以备在故障后进行事务恢复。

2PC 和 2PL

把这两个概念放到一起是为了帮助读者再次区分下,其实他们两个是完全不同的概念。2PL(Two-phase Locking)是数据库事务中为了提供“串行隔离”的一种方案(加锁、解锁,详细可以回顾第 06 课的 Serializable),而 2PC 是数据库为了在分布式事务中提供原子提交的一种算法协议。

三阶段提交(3PC Three-Phase Commit)

2PC 可以保证分布式事务的原子提交,但是也存在一些问题:在一个参与者 Crash 时,协调者会一直阻塞,不能继续处理后续的事务;在网络高延时的情况,协调者也会一直阻塞等待所有参与者的响应。在极端的情况下,如果协调者发出 Commit 命令给一部分参与者之后,协调者 Crash 了,则一部分参与者执行了 Commit,而另外一部分没收到消息的参与者则会一直阻塞等待协调者,此时还会产生部分参与者节点的数据不一致。所以,2PC 也叫基于阻塞的原子提交协议,并且存在协调者的单点问题,即使可以重新选举协调者,但参与者的阻塞是无法解决的。

为了改进 2PC 的阻塞性能,有另外一种非阻塞的分布式事务协议——3PC。3PC 基于“有限的网络延迟,以及有限的节点响应时间”的假设,将两阶段的第一阶段再多划分出一个阶段,并且 3PC 是非阻塞的。在分布式系统的实践中,往往不存在绝对可靠的网络和不会 Crash 的节点。所以 3PC 还需要有优秀的检测机制(检测延迟,是因为网络原因,还是节点 Crash 了)。

3PC 的三个阶段主要是:

  • CanCommit:协调者向参与者发送询问“是否可以提交事务”的请求,参与者如果可以提交就返回“Yes”响应,否则返回“NO”。
  • PreCommit:协调者根据所有的参与者的响应,决定是否“commit”或者“abort”事务。只有所有参与者都在有限的延时之内,返回“Yes”响应,才继续向所有参与者发送“PreCommit”请求,否则发送“abort”请求。所有参与者根据请求执行事务操作(写入 undo log,写入事务数据到磁盘),或者进行“abort”处理。
  • DoCommit:协调者根据参与者 PreCommit 阶段的响应,如果参与者返回的响应超时或者返回的不是“Yes”,则协调者发送“abort”给所有参与者,所有参与者根据 Undo log 进行事务回滚,否则发送“DoCommit”请求,所有参与者提交事务,释放资源。参与者处理完“rollback”或者“commit”之后会发送 ACK 响应给协调者。但如果这个响应超时,则协调者不会阻塞,会继续处理下一个事务。
    3PC 中的协调者和参与者都引入了超时机制(2PC 只在协调者中有超时机制),所以是非阻塞的,比如,在 DoCommit 阶段,如果协调者 Crash 了,或者超时导致参与者没有按时接收到命令,参与者会执行事务的提交(因为经过 CanCommit 阶段所有参与者都同意才进如到 PreCommit,所以到了 DoCommit 阶段,是一种基于乐观估计的概率,所有参与者觉得 commit 的概率大于 abort),然后,参与者可以继续处理后续的事务。而 2PC 中的 Commit 阶段,如果协调者没有发出 Commit 命令,则没有超时机制的参与者会一直阻塞。

但是 3PC 无法保证严格的分布式事务的原子性语义。比如在第三个阶段“DoCommit”,参与者返回的 ACK 消息超时了,3PC 中的协调者会乐观地认为事务执行成功了。然而在实际情况下,有可能部分节点执行成功,而部分节点确实没有 Commit。所以 3PC 不能提供严格的数据一致性保证,所以目前 2PC 还是被广泛得应用在一些数据库集群中处理事务的原子提交。

分布式事务实践

分布式系统,包含 SOA、微服务,很难避开要处理分布式事务。从上文可以知道,2PC 满足了分布式事务的原子性,但是抛弃了性能;3PC 提高了可用性,但是不保证原子提交。很多读者可能会问,在分布式环境下,除了原子性,那其他的,一致性、持久化、隔离性怎么考虑。其实除了原子性,其他三者的理解还是在单个数据库实例的本身思考即可。一致性需要应用自己去考虑数据的一致性约束,持久化、隔离性方面,底层存储引擎在单节点怎么处理,集群环境还是同样的处理。下面从两方面归类实践的思路,一个是存储集群内置的分布式事务支持,另外一种是需要结合多种分布式技术来实现。

存储集群内置实现

传统关系型数据库的集群,比如 MySQL、PostgreSQL、Oracle 等,都提供了内置的分布式事务支持。根据 MySQL 的数据报告,实现分布式事务要比在单个节点运行时间超过 10 倍。这是因为传统数据库的集群一般是通过 2PC 来实现分布式事务的,比如 MySQL 的 NDB 集群,如果网络延时高,阻塞的性能会更差。

如果使用存储集群提供的分布式事务,那么需要统一底层存储引擎,对于需要使用多样存储服务的 SOA、微服务架构来说,不是一个通用的解决方案。

基于跨存储、服务的技术实现

在微服务架构下,存储的技术不一定是单一的,可能有的微服务使用 MySQL,有的微服务使用 MongoDB,有的使用 Redis。在这种情况下,很难通过一个通用底层来解决。一般是需要应用层面通过一些分布式技术(比如分布式消息队列)以及一些事务模型(TCC)来实现。

X/Open XA 标准

X/Open XA 是一种基于 2PC 的,用来实现分布式事务的技术标准。XA 定义了一组接口,这组接口是定义如何实现 2PC 中的协调者的。也即由具体的协调者(事务管理器)来实现的。XA 不局限于底层的存储,可以是传统关系型数据库,可以是缓存,可以是消息队列也可以是应用服务。

目前很多传统的关系型数据库都支持 XA,比如 MySQL、PostgreSQL、Oracle等,而也有一些消息队列技术如 ActiveMQ,MSMQ 也实现了 XA。XA 也不局限实现语言。对于一个 Java 应用来说,JTA 就是一个实现了 XA 接口标准的事务管理器。其底层对于很多数据库的连接都基于 JDBC(Java Database Connectivity)driver,对于 Message Broker 可以使用 JMS(Java Message Service)。JTA 关于 XA 接口的实现,感兴趣的同学可以去看一下关于 javax.sql.XADataSource、javax.sql.XAConnection 的代码。

下面以 MySQL 的 JDBC Driver 为例,看其中一个存储的连接提供的基于 XA 的 Commit 方法(from com.mysql.jdbc.jdbc2.optional.MysqlXAConnection)。

package com.mysql.jdbc.jdbc2.optional;

import ...;

public class MysqlXAConnection extends MysqlPooledConnection implements XAConnection, XAResource{

... 

// xid 即是前文提到的全局的事务id。 
// onePhase 如果是true 则资源管理器需要使用 第1阶段 commit 协议。   
public void commit(Xid xid, boolean onePhase) throws XAException {
        StringBuilder commandBuf = new StringBuilder(MAX_COMMAND_LENGTH);
        commandBuf.append("XA COMMIT ");
        appendXid(commandBuf, xid);

        if (onePhase) {
            commandBuf.append(" ONE PHASE");
        }

        try {
            // log执行的语句: “log(Executing XA statement: " + command);” ,执行语句:调用 stmt.execute(String sql) 
            dispatchCommand(commandBuf.toString());
        } finally { // 标记全局事务该阶段不是正在处理一个全局事务  
            this.underlyingConnection.setInGlobalTx(false);
        }
    }


 ...

}

因为 XA 的实现一般是以一个 Library(比如 jar 包)加载在应用系统中,如果应用系统 crash,需要在重新运行起来之后,事务管理器(协调者)读取之前记录在本地磁盘中的事务日志来进行事务恢复,所以也可以看到一些实现 XA 的 driver 中定义了一些 callback,用来进程恢复后,请求参与者接口,进行事务“commit”或者“abort”。但也有一些情况,因为使用不同的数据存储服务,而有的存储服务因为没有提交事务,而一直持有一些锁(如行锁,或者间隙锁等)。对于一些复杂的情况,有时可能需要人工介入处理来决定事务是提交或者回滚。

XA 的好处就是目前已经有很多标准的实现,对于应用开发来说,分布式事务写起来跟本地事务没有什么差别。但是 XA 基于 2PC,所以性能也很差,不适合于在目前的需要尽快响应用户请求的 Web 服务。并且,XA 标准中没有办法识别跨系统的死锁,很多时候还需要系统自己额外进行死锁检测。

Try Confirm/Cancel(TCC)协议

Try Confirm/Cancel(TCC)是一种解决分布式事务的协议,但是其基于 BASE 模型,一致性方面只能保证最终一致性。分布式事务的原子提交方面,TCC 提供的是轻量级的原子性实现,也即存在中间态的数据可能被访问到。TCC 需要服务层(Microservice 的 Rest API 或者 SOA 的 service 层)提供三个(try、confirm、concel)事务处理接口,然后由统一的分布式事务管理器(TCC 协调者)来进行调度。下面先来看一下三个接口需要处理的逻辑。

  • Try:做领域服务的业务检查,事务资源检查,预留请求需要的资源。若预留资源成功,则返回 OK,否则返回 NO(这里的返回只是个返回模拟,如果是 Rest 接口,对应相关的 Http 状态码)。
  • Confirm:各个服务执行本服务的业务逻辑。会需要用到 Try 阶段的资源,并且如果 TCC 实现基于消息队列需要服务做好幂等处理。如果成功则返回 OK,否则返回 NO。
  • Cancel:如果 Confirm 接口有的服务执行失败或者超时,则会调用到 Cancel 接口。之前执行的本地事务需要执行回滚操作,执行后会释放事务过程占用的业务资源。
    这三个接口都是在应用层,也即每个应用开发者需要实现的,并且这三个接口是会分阶段被 TCC 协调器调用。TCC 协议适合 SOA、微服务架构处理分布式事务,可以基于 Dubbo 实现,也可以基于 Rest API 实现。

关于 Rest API 的实现方面,目前 Atomikos 公司提供了基于 TCC 的商业版本的实现。基于 TCC 协议的思想,我们也可以实现自己的分布式事务管理器。

下面来看一个简单的例子(microservice-trade 包含订单模型,microservice-item 包含商品模型,这里简单模拟一个下单减库存,在实际业务逻辑中会有更多的约束和关联),通过示例先了解下 TCC 的整体结构和实现思路,如下图所示:

2-25

如图所示,try 阶段,trade、item 微服务分别提供业务检查接口,并且在这个阶段会提前对必要的业务资源进行处理(比如 item 中,先减库存资源,防止超卖)。在 trade 微服务确认可以下单,以及 item 减库存成功后,两个服务分别返回“OK”,图中的“API-CompositeService”负责对于服务的 try 接口进行调用。这个服务是一个专门负责调度分布式事务的服务,并且负责开启以及关闭 TCC 协调者服务。在 try 阶段返回后,CompositeService 根据 Try 的返回决定是“Confirm”还是“Cancel”,然后调用 TCC 协调者,开启一个分布式事务,将事务的“confirm or cancel”信息传递给 TCC 协调者。另外,TCC 协调者需要将事务信息存储到本地的事务 log,用来做故障恢复的信息。最后,TCC 协调者调用各个微服务提供的“confirm”接口进行事务提交,或者“cancel”接口释放资源(本例中需要增加回库存)。

TCC 协议是一种轻量级的分布式事务实现,虽然不保证严格的原子性,但是在一些场景下很适合 SOA 架构,微服务架构处理分布式事务。目前 GitHub 上也有很多开源的实现。实现一套企业的 TCC,最核心的组件就是 TCC 协调者,在写自己的中间件时,也可以使用分布式消息队列。

虽然 TCC 相较 XA 有更好的性能,但是其开发成本也更高,需要服务额外提供多套事务处理的接口。并且因为不是严格的原子性,所以也只适合粒度比较小,执行时间短的事务。

基于分布式消息队列技术

对于微服务体系来说,一般都会引入一个可靠的分布式消息队列技术。微服务提倡服务间的松耦合,减少同步调用,更多时候在设计不同服务中领域模型的逻辑关联时,通过分布式消息队列可以很好地进行解耦。每个微服务的边界也会更加清晰,服务只要关注领域模型内部的逻辑即可。

通过可以支持 At-Least-Once 类型消息,以及高可用的分布式消息队列,可以很方便地实现分布式事务。其主要的实现思想一般是数据库事务和消息 delivery(postMessage)是在同一个事务中运行。只要其中一个执行失败则整个事务被放弃(比如消息发送失败了,则本地事务回滚;事务执行失败了,则消息不发送)。如果是 Java 工程实现的服务,可以使用 Spring 的事务管理机制,基于注解的方式可以使用 org.springframework.transaction.annotation.Transactional。如果消息投递出去但是接收消息的节点 crash 了,则可以通过消息重试机制,间隔一定时间,不断重试消息请求。但是这种情况,需要消费者(Subscriber)做好幂等处理。整体的实现思路如下图:

3-20

有的 MQ 还支持分布式事务,如 RocketMQ 中可以处理“事务消息”。具体实现,大家可以去下载源码工程,参考 com.alibaba.rocketmq.example.transaction.TransactionProducer 的实现即可。基于消息队列的情况,对于事务原子性的保证会更弱一点。尤其在消费者节点事务处理中 crash 了的情况。部分特殊场景可能还需要人工接入来处理,保证数据的一致性。

小结

本文介绍了分布式事务的基于阻塞的 2PC 以及非阻塞的 3PC 方案。3PC 是非严格的原子提交协议,在网络延迟高的环境下,如果出现超时,3PC 没法判定是网络问题还是节点 Crash 了。在分布式事务的实践中,目前被很多数据库集群应用的还是基于 2PC 实现分布式事务(如 MySQL、Oracle)。

如果是跨存储集群的系统还可以使用 XA。XA 是基于 2PC 协议提供的一套接口实现,其基于存储层或者作为一个三方包加载在每个服务中,所以服务如果 crash 或者下线了,则 XA 的事务管理器也随着挂了,并且 XA 也是阻塞的,不是很适合需要快速响应用户的 SOA、微服务架构。

随后我们介绍了基于非阻塞的两种实现分布式事务的方案。TCC 是可以建立在应用层或者业务层的解决方案,其实现可以和应用解耦。服务只需要提供包含本地事务处理的 service 接口,交给 TCC 中的协调者进行事务调度。TCC 的 Cancel 机制提供了一种基于“事务补偿型”的方案。另外一种高性能的解耦方案是基于可靠的分布式消息队列,消息队列中最难控制的就是消费者(Consumer)的处理,为了保证数据一致性,需要消费者做幂等处理,并且消息队列要支持重试,所以基于消息队列的方案也叫“最大努力型”。

本文内容主要介绍了分布式系统的一致性核心问题之一——原子提交以及分布式事务的实践方式。2PC 除了阻塞的特点,并且也是低容错的。第09课将会介绍带有分区容错特点的,解决分布式一致性的另外一个知识范畴——共识算法及应用。

资料
《Designing Data-Intensive Application》
Configuring Write­Scalable PostgreSQL Cluster
Mysql HA
Towards Distributed Atomic Transactions over RESTful Services

(全文完)

(转载本站文章请注明作者和出处 第08课:分布式事务实践