事件驱动

当需要跨多个模块进行数据修改时,有两种常用的方案:

  • 方案一:通过远程服务调用的方式,使用分布式事务协调框架保证数据一致性。

    • 优点:同步、实时的方案,能直接获取远程调用状态,并进行相应的处理。

    • 缺点:需要引入任务协调器,增加了系统复杂度。过于频繁的请求会对下游系统造成较大压力,需要额外的限流降级策略保证可用性。事务协调的过程需要按照框架定义的方式,通过大量的编码进行事件补偿处理。

  • 方案二:通过事件驱动的方式,使用事件补偿/重试机制保证最终一致性。

    • 优点:不需要依赖任务协调器,只需通过MQ即可完成间接的服务调用,通过事件重试和死信队列保证最终一致性。借助MQ的特性可以有效的缓解下游压力。

    • 缺点:异步的方式,调用者无法第一时间获取调用结果,需要通过进一步的事件通知完成调用结果的反馈。编码量和方案一相当。

从系统设计角度看:方案一虽然更加实时准确,但是需要服务消费者(调用方)了解自己影响的下游业务,无形中增加了系统的耦合程度;方案二需要事件源(服务调用方)定义自己产生的事件,服务提供者订阅相应的事件进行处理,根据需要返回处理结果。

综合考虑,事件驱动的方式更符合我们系统的现状。

依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

需要使用RabbitMQ完成事件的发布订阅

名词解释

  • binding:消息源的绑定,定义消息源(队列)、使用方式(发布/订阅)等信息。

  • binder:binding的载体,即具体的消息队列应用程序。

  • payload:被事件承载的实际业务数据。

配置

在入口类上需要配置@EnableBinding启用Spring Cloud Stream配置,参数为binding的接口定义,支持以数组形式传入多个binding定义。

application.yml中,需要配置RabbitMQ的相关信息。

配置说明:

  • binders:定义MQ的地址,根据需要可能会定义多个MQ。

  • bindings:定义具体的事件队列,binding key需要在代码中使用。

    • destination:定义实际的队列名。

    • group:同一个group中,只有一个实例可以获取消息,在消息订阅端需要配置为${spring.application.name},从而保证同一个服务的多个实例有且仅有一个会消费事件。

    • binder:存在多个MQ时,指定需要的MQ标识。

  • default-binder:默认使用的MQ配置。

为了示例方便,将生产者(foo-out)和消费者(foo-in)的binding配置到一起,实际环境中生产者和消费者是分开配置的。

Binding定义

一般来讲,针对每一个有消息交互的外部系统,会定义一个单独的binding。

当binding定义为@Input时,返回值有两种,分别代表两种模式:

  • SubscribableChannel:消息监听模式,即被动接受新消息

  • PollableChannel:消息拉取模式,即主动(定时)获取新消息

binding中的方法名没有特殊作用,但是为方便使用一般和binding key一致。

事件定义

事件的定义是灵活的,根据需要扩展的。一般来说有两种模式:

  • 使用不同的队列表示不同的业务事件,如:foo.created.event表示Foo的新增,foo.updated.event表示Foo的修改,等等。此时可以直接发送变更的业务数据。

  • 以业务领域对象划分队列,如:foo.event表示Foo的新增/修改/删除等操作。此时需要定义不同的事件类以便区分不同的事件,业务数据作为payload封装在事件中。

下面展示采用第二种方案的,一种可行的事件定义思路:

接口

定义基本的约束:需要携带数据,有事件类型。

抽象类

实现基本的框架:承载的数据为(单条)FooDto。

实现类

最终的事件定义,其中事件类型在实际使用时多为常量或枚举值。

消息发布

在事件源(消息发送者的业务代码)中,需要调用binding发送事件。

消息订阅

消息订阅者需要定义MessageHandler进行事件的处理,使用@StreamListener注解指定方法对应的binding key。

这种简单订阅-处理模式下,消息处理方法不允许有返回值。

消息回执

一般来说,消息订阅者收到消息以后进行处理即可,无需返回值。某些业务场景下,消息发布者可能希望知道消息订阅者的处理情况,此时双方需要约定新的“回执队列”,使用@SendTo注解指定回执队列的binding key,并且将回执内容返回即可。

此时返回值即为消息内容,无需像发送端一样手动构造Message对象。

参考文档

关于Spring Cloud Stream技术的更多内容请参考官方文档:https://cloud.spring.io/spring-cloud-static/Finchley.SR1/multi/multi__spring_cloud_stream.html

Last updated