引入 DTM 以支持 ABP 的多租户多数据库场景

这篇文章分享了使用 DTM 二阶段消息模式解决 issue #10036 的方法。

今天我们要使用 EasyAbp 的 Abp.EventBus.Boxes.Dtm 模块。

DTM 事件箱的介绍

这个模块使用了 DTM 的 二阶段消息 使得 ABP 的事件箱得以支持 多租户多数据库场景

你需要先阅读 DTM 文档,它将帮助你理解这个模块。

与 ABP 默认事件箱的差异

DTM 二阶段消息事件箱 ABP 5.0+ 默认事件箱
收发速度 :heavy_check_mark: :x:
更少的数据传输 :x: :heavy_check_mark:
保证事件发出
(事务工作单元)
:heavy_check_mark: :heavy_check_mark:
保证事件发出
(非事务工作单元)
:x: :heavy_check_mark:
(要求消费端解决幂等)
避免重复处理(仅限纯DB操作) :heavy_check_mark: :heavy_check_mark:
支持多租户多数据库 :heavy_check_mark: :x:
没有增加外部设施 :x: :heavy_check_mark:
管理面板和报警 :heavy_check_mark: :x:

DTM 发件箱是如何工作的?

假设你正在使用发件箱发布新的事件:

await _distributedEventBus.PublishAsync(eto1, useOutbox: true);
await _distributedEventBus.PublishAsync(eto2, useOutbox: true);  // useOutbox 的默认值即 true

DTM 发件箱会临时存储这些事件。接下来我们看看,在你完成当前工作单元时它会怎么做:

// UnitOfWork.cs 的代码片段
protected override async Task CommitTransactionsAsync()
{
    // 第 1 步:在事务内插入一条记录到 DTM 屏障表,接着发送一条“prepare”请求到 DTM 服务器
    await DtmMessageManager.InsertBarriersAndPrepareAsync(EventBag);

    // 第 2 步: 提交当前 DB 事务
    await base.CommitTransactionsAsync();

    // 第 3 步: 发送一条"submit"请求到 DTM 服务器
    OnCompleted(async () => await DtmMessageManager.SubmitAsync(EventBag));
}

至此,DTM 服务器已经收到了一条“submit”请求。它会调用 app 的 PublishEvents 服务并附上所有事件的数据,后者被调用后会立即发布这些事件到 MQ。

 

查看更详细的时序图

 

 

如果你依然对这个模式“如何确保发送”困惑,请查看 DTM 的 二阶段消息文档 以了解更多。

DTM 收件箱是如何工作的?

与 ABP 的默认实现不同,DTM 收件箱从 MQ 收到一个事件后会立即处理(handle)。在所有的 handler 完成他们的工作后,收件箱会沿用当前事务,向 DTM 屏障表插入一条记录。最后它提交了事务并向 MQ 返回 ACK。

所有入箱的事件都拥有一条唯一的 MessageId。拥有相同 MessageId 的事件只会被处理一次,因为我们不能插入 gid (MessageId) 重复的记录到 DTM 屏障表。

 

查看更详细的时序图

 

正如你注意到的,DTM 服务器没有参与收件箱的工作。🤭

安装和使用

请阅读 https://github.com/EasyAbp/Abp.EventBus.Boxes.Dtm/tree/main#installation.

后记

这样的一个 DTM 事件箱实现并不是完美的解决方案。这里最大的成本是,你需要额外关心 DTM 服务器的可用性。但是如果你遇到了多租户多数据库的场景,它就是现在最佳的选择。