伙伴云客服论坛»论坛 S区 S软件开发 查看内容

0 评论

0 收藏

分享

详解领域驱动设计之事件驱动与CQRS

目录

    一、前言:从物流详情开端二、领域事件
      2.1、建模领域事件2.2、领域事件代码解读2.3、领域事件的存储
        2.3.1、单独的EventStore2.3.2、与业务数据一起存储
      2.4、领域事件如何发布
        2.4.1、由领域聚合发送领域事件2.4.2、事件总线VS消息中间件

    三、Saga分布式事务
      3.1、Saga概要3.2、Saga实现
        3.2.1、协同式(choreography)3.2.2、编排式(orchestration)3.2.3、补偿战略

    四、CQRS五、自治效劳和系统六、结语


一、前言:从物流详情开端

大家对物流跟踪都不陌生,它详细记录了在什么时间发生了什么,并且数据作为重要凭证是不可变的。我理解其背后的价值有这么几个方面:业务方可以管控每个子过程、晓得目前所处的环节;另一方面,当需要追溯时候仅仅通过每一步的记录就可以回放整个历史过程。
我在之前的文章中提出过“软件项目也是人类社会消费关系的范畴,只不过我们所发明的劳动成果看不见摸不着而已”。所以我们可以借鉴物流跟踪的思路来开发软件项目,把复杂过程拆解为一个个步骤、子过程、状态,这和我们事件划分是一致的,这就是事件驱动的典型案例。
详解领域驱动设计之事件驱动与CQRS-1.jpg


二、领域事件

领域事件(Domain Events)是领域驱动设计(Domain Driven Design,DDD)中的一个概念,用于捕获我们所建模的领域中所发生过的事情。
领域事件自身也作为通用语言(Ubiquitous Language)的一部分成为包括领域专家在内的所有项目成员的交流用语。
比如在前述的跨境物流例子中,货品到达保税仓以后需要分派工作人员停止分拣分包,那么“货品已到达保税仓”便是一个领域事件。
首先,从业务逻辑来说该事件关系到整个流程的胜利或者失败;同时又将触发后续子流程;而对于业务方来说,该事件也是一个标志性的里程碑,代表自己的货品就快配送到自己手中。
所以通常来说,一个领域事件具有以下几个特征:较高的业务价值,有助于形成完好的业务闭环,将导致进一步的业务操作。这里还要强调一点,领域事件具有明确的边境。
比如:假设你建模的是餐厅的结账系统,那么此时的“客户已到达”便不是你关心的重点,因为你不可能在客户到达时就立即向对方要钱,而“客户已下单”才是对结账系统有用的事件。

2.1、建模领域事件

在建模领域事件时,我们应该根据限界上下文中的通用语言来命名事件及属性。假设事件由聚合上的命令操作产生,那么我们通常根据该操作方法的名字来命名领域事件。
对于上面的例子“货品已到达保税仓”,我们将发布与之对应的领域事件
GoodsArrivedBondedWarehouseEvent(当然在明确的界线上下文中也可以去掉聚合的名字,直接建模为ArrivedBondedWarehouseEvent,这都是命名方面的习惯)。
事件的名字标明了聚合上的命令方法在执行胜利之后所发生的事情,换句话说待定项以及不确定的状态是不能作为领域事件的。
一个行之有效的方法是画出当前业务的状态流转图,包含前置操作以及引起的状态变卦,这里表达的是已经变卦完成的状态所以我们不用过去时态表示,比如删除或者取消,即代表已经删除或者已经取消。
然后对于其中的节点停止事件建模。如下图是文件云端存储的业务,我们分别对预上传、上传完成确认、删除等环节建模“过去时”事件,PreUploadedEvent、ConfirmUploadedEvent、RemovedEvent。
详解领域驱动设计之事件驱动与CQRS-2.jpg



2.2、领域事件代码解读
  1. package domain.event;
  2. import java.util.Date;
  3. import java.util.UUID;
  4. public class DomainEvent {
  5.     /**
  6.      * 领域事件还包含了唯一ID,
  7.      * 但是该ID并不是实体(Entity)层面的ID概念,
  8.      * 而是主要用于事件追溯和日志。
  9.      * 假设是数据库存储,该字段通常为唯一索引。
  10.      */
  11.     private final String id;
  12.     /**
  13.      * 创建时间用于追溯,另一方面不论使用了
  14.      * 哪种事件存储都有可能遇到事件延迟,
  15.      * 我们通过创建时间可以确保其发生顺序。
  16.      */
  17.     private final Date occurredOn;
  18.     public DomainEvent() {
  19.         this.id = String.valueOf(UUID.randomUUID());
  20.         this.occurredOn = new Date();
  21.     }
  22. }
复制代码
在创建领域事件时,需要注意2点:
    领域事件自身应该是不变的(Immutable);领域事件应该携带与事件发生时相关的上下文数据信息,但是并不是整个聚合根的状态数据。例如,在创建订单时可以携带订单的根本信息,而对于用户更新订单收货地址事件AddressUpdatedEvent事件,只需要包含订单、用户以及新的地址等信息即可。
  1. public class AddressUpdatedEvent extends DomainEvent {
  2.     //通过userId+orderId来校验订单的合法性;
  3.     private String userId;
  4.     private String orderId;
  5.     //新的地址
  6.     private Address address;
  7.     //略去详细业务逻辑
  8. }
复制代码
2.3、领域事件的存储

事件的不可变性与可追溯性都决定了其必需要耐久化的原则,我们来看看常见的几种方案。

2.3.1、单独的EventStore

有的业务场景中会创建一个单独的事件存储中心,可能是Mysql、Redis、Mongo、甚至文件存储等。这里以Mysql举例,business_code、event_code用来区分不同业务的不同事件,详细的命名规则可以根据实际需要。
这里需要注意该数据源与业务数据源不一致的场景,我们要确保当业务数据更新以后事件可以准确无误的记录下来,理论中尽量防止使用分布式事务,或者尽量防止其跨库的场景,否则你就得想想如何补偿了。千万要防止,用户更新了收货地址,但是AddressUpdatedEvent事件保管失败。
总的原则就是对分布式事务Say No,无论如何,我相信方法总比问题多,在理论中我们总可以想到处置方案,区别在于该方案是否简洁、是否做到理解耦。
  1. # 考虑是否需要分表,事件存储建议逻辑简单
  2. CREATE TABLE `event_store` (
  3.   `event_id` int(11) NOT NULL auto increment,
  4.   `event_code` varchar(32) NOT NULL,
  5.   `event_name` varchar(64) NOT NULL,
  6.   `event_body` varchar(4096) NOT NULL,
  7.   `occurred_on` datetime NOT NULL,
  8.   `business_code` varchar(128) NOT NULL,
  9.   UNIQUE KEY (`event id`)
  10. ) ENGINE=InnoDB COMMENT '事件存储表';
复制代码
2.3.2、与业务数据一起存储

在分布式架构中,每个模块都做的相对比较小,准确的说是“自治”。假设当前业务数据量较小,可以将事件与业务数据一起存储,用相关标识区分是真实的业务数据还是事件记录;或者在当前业务数据库中建立该业务自己的事件存储,但是要考虑到事件存储的量级必然大于真实的业务数据,考虑是否需要分表。
这种方案的优势:数据自治;防止分布式事务;不需要额外的事件存储中心。当然其优势就是不能复用。

2.4、领域事件如何发布


2.4.1、由领域聚合发送领域事件
  1. /*
  2. * 一个关于竞赛的充血模型例子
  3. * 贫血模型会构造一个MatchService,我们这里通过模型来触发相应的事件
  4. * 本例中略去了详细的业务细节
  5. */
  6. public class Match {
  7.     public void start() {
  8.         //构造Event....
  9.         MatchEvent matchStartedEvent = new MatchStartedEvent();
  10.         //略去详细业务逻辑
  11.         DefaultDomainEventBus.publish(matchStartedEvent);
  12.     }
  13.     public void finish() {
  14.         //构造Event....
  15.         MatchEvent matchFinishedEvent = new MatchFinishedEvent();
  16.         //略去详细业务逻辑
  17.         DefaultDomainEventBus.publish(matchFinishedEvent);
  18.     }
  19.     //略去Match对象根本属性
  20. }
复制代码
2.4.2、事件总线VS消息中间件

微效劳内的领域事件可以通过事件总线或利用应用效劳实现不同聚合之间的业务协同。即微效劳内发生领域事件时,由于大部分事件的集成发生在同一个线程内,不一定需要引入消息中间件。但一个事件假仿佛时更新多个聚合数据,依照 DDD“一个事务只更新一个聚合根”的原则,可以考虑引入消息中间件,通过异步化的方式,对微效劳内不同的聚合根采用不同的事务

详解领域驱动设计之事件驱动与CQRS-5.jpg


三、Saga分布式事务


3.1、Saga概要

我们看看如何使用 Saga 形式维护数据一致性?
Saga 是一种在微效劳架构中维护数据一致性的机制,它可以防止分布式事务所带来的问题。
一个 Saga 表示需要更新的多个效劳中的一个,即Saga由一连串的本地事务组成。每一个本地事务负责更新它所在效劳的私有数据库,这些操作仍旧依赖于我们所熟悉的ACID事务框架和函数库。
形式:Saga
通过使用异步消息来协调一系列本地事务,从而维护多个效劳之间的数据一致性。
请参阅(强烈建议):https://microservices.io/patterns/data/saga.html
Saga与TCC相比少了一步Try的操作,TCC无论最终事务胜利失败都需要与事务参与方交互两次。而Saga在事务胜利的情况下只需要与事务参与方交互一次, 假设事务失败,需要额外停止补偿回滚。
    每个Saga由一系列sub-transaction Ti 组成;每个Ti 都有对应的补偿动作Ci,补偿动作用于撤销Ti形成的结果;
可以看到,和TCC相比,Saga没有“预留”动作,它的Ti就是直接提交到库。
Saga的执行顺序有两种:
    success:T1, T2, T3, ..., Tn ;failure:T1, T2, ..., Tj, Cj,..., C2, C1,其中0 < j < n;
所以我们可以看到Saga的撤销非常关键,可以说使用Saga的难点就在于如何设计你的回滚战略。
详解领域驱动设计之事件驱动与CQRS-6.jpg


3.2、Saga实现

通过上面的例子我们对Saga有了初步的体感,如今来深化讨论下如何实现。当通过系统命令启动Saga时,协调逻辑必需选择并通知第一个Saga参与方执行本地事务。一旦该事务完成,Saga协调选择并调用下一个Saga参与方。
这个过程不时持续到Saga执行完所有步骤。假设任何本地事务失败,则 Saga必需以相反的顺序执行补偿事务。以下几种不同的方法可用来构建Saga的协调逻辑。

3.2.1、协同式(choreography)

把 Saga 的决策和执行顺序逻辑分布在 Saga的每一个参与方中,它们通过交换事件的方式来停止沟通。
详解领域驱动设计之事件驱动与CQRS-7.jpg

(引用于《微效劳架构设计形式》相关章节)
Order效劳创建一个Order并发布OrderCreated事件。
Consumer效劳消费OrderCreated事件,验证消费者是否可以下订单,并发布ConsumerVerified事件。
Kitchen效劳消费OrderCreated事件,验证订单,在CREATE_PENDING状态下创建故障单,并发布TicketCreated事件。
Accounting效劳消费OrderCreated事件并创建一个处于PENDING状态的Credit CardAuthorization。
Accounting效劳消费TicketCreated和ConsumerVerified事件,向消费者的信誉卡收费,并发布信誉卡受权失败事件。
Kitchen效劳使用信誉卡受权失败事件并将故障单的状态更改为REJECTED。
订单效劳消费信誉卡受权失败事件,并将订单状态更改为已回绝。

3.2.2、编排式(orchestration)

把Saga的决策和执行顺序逻辑集中在一个Saga编排器类中。Saga 编排器发出命令式消息给各个 Saga 参与方,指示这些参与方效劳完成详细操作(本地事务)。类似于一个状态机,当参与方效劳完成操作以后会给编排器发送一个状态指令,以决定下一步做什么。
详解领域驱动设计之事件驱动与CQRS-8.jpg

(引用于《微效劳架构设计形式》相关章节)
我们来分析一下执行流程
Order Service首先创建一个Order和一个创建订单控制器。之后,途径的流程如下:
Saga orchestrator向Consumer Service发送Verify Consumer命令。
Consumer Service回复Consumer Verified消息。
Saga orchestrator向Kitchen Service发送Create Ticket命令。
Kitchen Service回复Ticket Created消息。
Saga协调器向Accounting Service发送受权卡消息。
Accounting效劳部门使用卡片受权消息回复。
Saga orchestrator向Kitchen Service发送Approve Ticket命令。
Saga orchestrator向订单效劳发送批准订单命令。

3.2.3、补偿战略

之前的描绘中我们说过Saga最重要的是如何处置异常,状态机还定义了许多异常状态。如上面的6就会发生失败,触发AuthorizeCardFailure,此时我们就要完毕订单并把之前提交的事务停止回滚。这里面要区分哪些是校验性事务、哪些是需要补偿的事务。
详解领域驱动设计之事件驱动与CQRS-9.jpg


一个Saga由三种不同类型的事务组成:可补偿性事务(可以回滚,因而有一个补偿事务);关键性事务(这是 Saga的成败关键点,比如4账户代扣);以及可反复性事务,它不需要回滚并保证可以完成(比如6更新状态)。
在Create Order Saga 中,createOrder()、createTicket()步骤是可补偿性事务且具有撤销其更新的补偿事务。
verifyConsumerDetails()事务是只读的,因而不需要补偿事务。authorizeCreditCard()事务是这个 Saga的关键性事务。假设消费者的信誉卡可以受权,那么这个Saga保证完成。approveTicket()和approveRestaurantOrder()步骤是在关键性事务之后的可反复性事务。
认真拆解每个步骤、然后评估其补偿战略尤为重要,正如你看到的,每品种型的事务在对策中扮演着不同的角色。

四、CQRS

前面讲述了事件的概念,又分析了Saga如何处置复杂事务,如今我们来看看CQRS为什么在DDD中广泛被采用。除了读写分别的特征以外,我们用事件驱动的方式来理论Command逻辑能有效降低业务的复杂度。
当你明白如何建模事件、如何躲避复杂事务,明白什么时候用消息中间件、什么时候采用事件总线,才干理解为什么是CQRS、怎么正确应用。
详解领域驱动设计之事件驱动与CQRS-11.jpg

下面是我们项目中的设计,这里为什么会呈现Read/Write Service,是为了封装调用,service内部是基于聚合发送事件。因为我发如今实际项目中,很多人都会第一时间问我要XXXService而不是XXX模型,所以在DDD没有完全普及的项目中建议大家采取这种居中战略。这也符合咱们的解耦,对方依赖我的笼统才干,然而我内部是基于DDD还是传统的流程代码对其是无关透明的。
详解领域驱动设计之事件驱动与CQRS-12.jpg

我们先来看看事件以及处置器的时序关系。
详解领域驱动设计之事件驱动与CQRS-13.jpg

这里还是以文件云端存储业务为例,下面是一些处置器的核心代码。注释行是对代码功能、用法以及扩展方面的解读,请认真阅读。
  1. package domain;
  2. import domain.event.DomainEvent;
  3. import domain.handler.event.DomainEventHandler;
  4. import java.util.ArrayList;
  5. import java.util.HashMap;
  6. import java.util.List;
  7. import java.util.Map;
  8. public class DomainRegistry {
  9.     private Map<String, List<DomainEventHandler>> handlerMap =
  10.         new HashMap<String, List<DomainEventHandler>>();
  11.     private static DomainRegistry instance;
  12.     private DomainRegistry() {
  13.     }
  14.     public static DomainRegistry getInstance() {
  15.         if (instance == null) {
  16.             instance = new DomainRegistry();
  17.         }
  18.         return instance;
  19.     }
  20.     public Map<String, List<DomainEventHandler>> getHandlerMap() {
  21.         return handlerMap;
  22.     }
  23.     public List<DomainEventHandler> find(String name) {
  24.         if (name == null) {
  25.             return null;
  26.         }
  27.         return handlerMap.get(name);
  28.     }
  29.     //事件注册与维护,register分多少个场景根据业务拆分,
  30.     //这里是业务流的核心。假设多个事件需要维护前后依赖关系,
  31.     //可以维护一个priority逻辑
  32.     public void register(Class<? extends DomainEvent> domainEvent,
  33.                          DomainEventHandler handler) {
  34.         if (domainEvent == null) {
  35.             return;
  36.         }
  37.         if (handlerMap.get(domainEvent.getName()) == null) {
  38.             handlerMap.put(domainEvent.getName(), new ArrayList<DomainEventHandler>());
  39.         }
  40.         handlerMap.get(domainEvent.getName()).add(handler);
  41.         //依照优先级停止事件处置器排序
  42.         。。。
  43.     }
  44. }
复制代码
文件上传完毕事件的例子。
  1. package domain.handler.event;
  2. import domain.DomainRegistry;
  3. import domain.StateDispatcher;
  4. import domain.entity.meta.MetaActionEnums;
  5. import domain.event.DomainEvent;
  6. import domain.event.MetaEvent;
  7. import domain.repository.meta.MetaRepository;
  8. import org.springframework.stereotype.Component;
  9. import javax.annotation.PostConstruct;
  10. import javax.annotation.Resource;
  11. /**
  12. * @Description:一个事件操作的处置器
  13. * 我们混合使用了Saga的两种形式,外层事件交互;
  14. * 对于单个复杂的事件内部采取状态流转实现。
  15. */
  16. @Component
  17. public class MetaConfirmUploadedHandler implements DomainEventHandler {
  18.     @Resource
  19.     private MetaRepository metaRepository;
  20.     public void handle(DomainEvent event) {
  21.         //1.我们在当前的上下文中定义个ThreadLocal变量
  22.         //用于寄存事件影响的聚合根信息(线程共享)
  23.         //2.当然假设有需要额外的信息,可以基于event所
  24.         //携带的信息构造Specification从repository获取
  25.         // 代码示例
  26.         // metaRepository.queryBySpecification(SpecificationFactory.build(event));
  27.         DomainEvent domainEvent = metaRepository.load();
  28.         //此处是我们的逻辑
  29.         。。。。
  30.         //对于单个操作比较复杂的,可以使用状态流转进一步拆分
  31.         domainEvent.setStatus(nextState);
  32.         //在事件触发之后,仍需要一个状态跟踪器来处置大事务问题
  33.         //Saga编排式
  34.         StateDispatcher.dispatch();
  35.     }
  36.     @PostConstruct
  37.     public void autoRegister() {
  38.         //此处可以更加细分,注册在哪一类场景中,这也是事件驱动的强大、灵敏之处。
  39.         //防止了if...else判断。我们可以有这样的意识,一旦你的逻辑里面充溢了大量
  40.         //switch、if的时候来看看自己注册的场景是否可以继续细分
  41.         DomainRegistry.getInstance().register(MetaEvent.class, this);
  42.     }
  43.     public String getAction() {
  44.         return MetaActionEnums.CONFIRM_UPLOADED.name();
  45.     }
  46.     //适用于前后依赖的事件,通过优先级指定执行顺序
  47.     public Integer getPriority() {
  48.         return PriorityEnums.FIRST.getValue();
  49.     }
  50. }
复制代码
事件总线逻辑
  1. package domain;
  2. import domain.event.DomainEvent;
  3. import domain.handler.event.DomainEventHandler;
  4. import java.util.List;
  5. public class DefaultDomainEventBus {
  6.     public static void publish(DomainEvent event, String action,
  7.                                EventCallback callback) {
  8.         List<DomainEventHandler> handlers = DomainRegistry.getInstance().
  9.             find(event.getClass().getName());
  10.         handlers.stream().forEach(handler -> {
  11.             if (action != null && action.equals(handler.getAction())) {
  12.                 Exception e = null;
  13.                 boolean result = true;
  14.                 try {
  15.                     handler.handle(event);
  16.                 } catch (Exception ex) {
  17.                     e = ex;
  18.                     result = false;
  19.                     //自定义异常处置
  20.                     。。。
  21.                 } finally {
  22.                     //write into event store
  23.                     saveEvent(event);
  24.                 }
  25.                 //根据实际业务处置回调场景,DefaultEventCallback可以返回
  26.                 if (callback != null) {
  27.                     callback.callback(event, action, result, e);      
  28.                 }
  29.             }
  30.         });
  31.     }
  32. }
复制代码
五、自治效劳和系统

DDD中强调限界上下文的自治特性,事实上,从更小的粒度来看,对象仍然需要具备自治的这四个特性,即:最小完备、自我履行、稳定空间、独立进化。其中自我履行是重点,因为不强依赖外部所以稳定、因为稳定才可能独立进化。这就是六边形架构在DDD中较为普遍的原因。
详解领域驱动设计之事件驱动与CQRS-14.jpg


六、结语

本文所讲述的事件、Saga、CQRS的方案均可以单独使用,可以应用到你的某个method、或者你的整个package。项目中我们并不一定要理论一整套CQRS,只要其中的某些思想处置了我们项目中的某个问题就足够了。
也许你如今已经磨刀霍霍,准备在项目中理论一下这些技巧。不过我们要明白“每一个硬币都有两面性”,我们不只看到高扩展、解耦的、易编排的优点以外,仍然要明白其所带来的问题。利弊分析以后再去决定如何实现才是正确的应对之道。
    这类编程形式有一定的学习曲线;基于消息传送的应用程序的复杂性;处置事件的演化有一定难度;删除数据存在一定难度;查询事件存储库非常有挑战性。
不过我们还是要认识到在其适宜的场景中,六边形架构以及DDD战术将加速我们的领域建模过程,也迫使我们从严格的通用语言角度来解释一个领域,而不是一个个需求。任何更强调核心域而不是技术实现的方式都可以增加业务价值,并使我们获得更大的竞争优势。
详解领域驱动设计之事件驱动与CQRS-15.jpg

以上就是详解领域驱动设计之事件驱动与CQRS的详细内容,更多关于领域驱动设计 事件驱动与CQRS的资料请关注网站其它相关文章!

回复

举报 使用道具

全部回复
暂无回帖,快来参与回复吧
本版积分规则 高级模式
B Color Image Link Quote Code Smilies

大明白
注册会员
主题 17
回复 17
粉丝 0
|网站地图
快速回复 返回顶部 返回列表