码恋 码恋

ALL YOUR SMILES, ALL MY LIFE.

目录
RocketMQ 分布式事务消息
/  

RocketMQ 分布式事务消息

一、什么是事务

事务是将一次执行过程中所涉及的所有操作纳入到一个不可分割的执行单元,组成事务的所有操作只有在所有操作均能正常执行的情况下才能提交,只要其中任一操作执行失败,都将导致整个事务的回滚。一句话来说,就是保证多个操作要么都做,要么都不做。同时一旦事务提交,则其所做的修改会永久保存到数据库。

二、事务的四个特性(ACID)

  • A:原子性(Atomicity)
    一个事务(transaction)中的所有操作,要么全部完成,要么全部不完成,不会结束在中间某个环节。事务在执行过程中发生错误,会被回滚(Rollback)到事务开始前的状态,就像这个事务从来没有执行过一样。
  • C:一致性(Consistency)
    事务的一致性指的是在一个事务执行之前和执行之后数据库都必须处于一致性状态。如果事务成功地完成,那么系统中所有变化将正确地应用,系统处于有效状态。如果在事务中出现错误,那么系统中的所有变化将自动地回滚,系统返回到原始状态。
  • I:隔离性(Isolation)
    指的是在并发环境中,当不同的事务同时操纵相同的数据时,每个事务都有各自的完整数据空间。由并发事务所做的修改必须与任何其他并发事务所做的修改隔离。事务查看数据更新时,数据所处的状态要么是另一事务修改它之前的状态,要么是另一事务修改它之后的状态,事务不会查看到中间状态的数据。
  • D:持久性(Durability)
    指的是只要事务成功结束,它对数据库所做的更新就必须永久保存下来。即使发生系统崩溃,重新启动数据库系统后,数据库还能恢复到事务成功结束时的状态。

三、InnoDB 事务实现

基于衡量事务的四个特性,InnoDB 实现事务实际上就是 4 个特性的实现。

  • 原子性

    • 在 MySQL 中有很多类型的日志,二进制日志、查询日志、错误日志、慢查询日志等等。除了这些日志,还提供了两种事务日志,redo log 用来保证持久性, undo log 是原子性和隔离性实现的基础。
    • 数据库每执行一条更新数据的 sql 就会生成一条 undo log,比如 insert 一条数据,就会生出一条 delete 的 undo log。如果事务执行失败或者调用 rollback 就可以根据 undo log 做数据回滚。
  • 隔离性

    • 隔离性是指,事务内部的操作与其他事务是隔离的,并发执行的各个事务之间不能互相干扰。严格的隔离性,对应了事务隔离级别中的Serializable (可串行化),但实际应用中出于性能方面的考虑很少会使用可串行化。
    • InnoDB 采用可重复读隔离级别,使用 MVCC 和行锁、间隙锁实现隔离性。
  • 持久性

    • InnoDB作为MySQL的存储引擎,数据是存放在磁盘中的,但如果每次读写数据都需要磁盘IO,效率会很低。为此,InnoDB提供了缓存(Buffer Pool),Buffer Pool中包含了磁盘中部分数据页的映射,作为访问数据库的缓冲:当从数据库读取数据时,会首先从Buffer Pool中读取,如果Buffer Pool中没有,则从磁盘读取后放入Buffer Pool;当向数据库写入数据时,会首先写入Buffer Pool,Buffer Pool中修改的数据会定期刷新到磁盘中(这一过程称为刷脏)。
    • Buffer Pool的使用大大提高了读写数据的效率,但是也带了新的问题:如果MySQL宕机,而此时Buffer Pool中修改的数据还没有刷新到磁盘,就会导致数据的丢失,事务的持久性无法保证。
    • 于是,redo log被引入来解决这个问题:当数据修改时,除了修改Buffer Pool中的数据,还会在redo log记录这次操作;当事务提交时,会调用fsync接口对redo log进行刷盘。如果MySQL宕机,重启时可以读取redo log中的数据,对数据库进行恢复。redo log采用的是WAL(Write-ahead logging,预写式日志),所有修改先写入日志,再更新到Buffer Pool,保证了数据不会因MySQL宕机而丢失,从而满足了持久性要求。

    既然redo log也需要在事务提交时将日志写入磁盘,为什么它比直接将Buffer Pool中修改的数据写入磁盘(即刷脏)要快呢?主要有以下两方面的原因:
    (1)刷脏是随机IO,因为每次修改的数据位置随机,但写redo log是追加操作,属于顺序IO。
    (2)刷脏是以数据页(Page)为单位的,MySQL默认页大小是16KB,一个Page上一个小修改都要整页写入;而redo log中只包含真正需要写入的部分,无效IO大大减少。

  • 一致性

    • 一致性是指事务执行结束后,数据库的完整性约束没有被破坏,事务执行的前后都是合法的数据状态。
    • 一致性不仅由数据库本身来保证,同时业务系统也保证数据的一致性。

四、分布式事务的由来

现代软件架构随着业务领域划分为多个微服务,共同组成了复杂的软件系统。而从数据库层面来看,随着数据量的爆发,不得不采用分库分表的方式,降低数据库的压力。这样,就造成多个服务依赖不同的数据库,那么在同时操作的时候,如何保证事务?这就是分布式事务。

简而言之,分布式事务就是一个大的事务由不同的子事务组成,这些小的事务操作分布在不同的服务器节点上面,属于不同的微服务,分布式事务需要保证同一事务下的子事务要么全部成功,要么全部失败,即保证数据的最终一致性。

五、分布式事务解决方案

在这篇不想用太大的篇幅说一些概念上的东西,但是要说 RocketMQ 的分布式事务实现,所以在这里顺便提一下当前分布式事务的集中解决方案:

  • 两阶段提交(2PC)

    两阶段提交(2PC) 是 Oracle Tuxedo 系统提出的 XA 分布式事务协议的其中一种实现方式,参考 《分布式事务之两阶段提交(2PC)》

  • Try-Confirm-Cancle (TCC)

    TCC 是基于尝试、确认、取消来实现分布式事务的,想了解更多,参考 《分布式事务之补偿事务( TCC )》

  • 本地消息表

    本地消息表 方案最初是ebay提出的,核心是将需要分布式处理的任务通过消息日志的方式来异步执行。消息日志可以存储到本地文本、数据库或消息队列,再通过业务规则自动或人工发起重试。人工重试更多的是应用于支付场景,通过对账系统对事后问题的处理。

image.png

除了上述外,还有一些解决方案,比如阿里 SEATA ,SAGA方案和最大努力通知...感兴趣同学们可以自行了解,当然还有我们这篇要说的 MQ 事务。

六、MQ 事务

RocketMQ 是阿里开源的一款高性能、高吞吐量的分布式消息中间件,基于消息异步方式提供了对分布式事务的支持,实现事务最终一致性。

下面是 RocketMQ 事务消息的基本流程交互图:

image.png

如图其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。

1.事务消息发送及提交:

(1) 发送 half 消息。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行 Commit 或者 Rollback( Commit 操作生成消息索引,消息对消费者可见)

流程图如下:

image.png

2.补偿流程:

(1) 对没有 Commit/Rollback 的事务消息( pending 状态的消息),从服务端发起一次“回查”
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback

其中,补偿阶段使用定时器回查方式用于解决消息 Commit 或者 Rollback 发生超时或者失败的情况。

七、RocektMQ 事务消息的使用

如上,小伙伴们应该对 RocketMQ 的事务消息有了一定的了解,下面看下如何在开发场景下如何使用。

发送事务消息时和普通的消息区别是,自己要新建一个 TransactionMQProducer 和对应的一个 TransactionListener的实现。

  • TransactionMQProducer
    具体的配置有 group、 nameServer 地址、执行本地事务的线程池和事务监听器的实现。
this.producer = new TransactionMQProducer(config.getGroup());
    this.producer.setNamesrvAddr(config.getNameServer());
    this.producer.setExecutorService(config.getExecutorService());
    this.producer.setTransactionListener(config.getTransactionListener());
  • TransactionListener
    实现 TransactionListener 接口的两个方法:
    • executeLocalTransaction(Message message, Object o)
      用于执行本地事务的方法。
    • checkLocalTransaction(MessageExt messageExt)
      RocketMQ 回查本地事务状态调用的方法。

代码详见 👀 : https://github.com/wangning1018/rocketmq-transaction-message-demo



❤ 转载请注明本文地址或来源,谢谢合作 ❤


center