当需要跨多个模块进行数据修改时,有两种常用的方案:
方案一:通过远程服务调用的方式,使用分布式事务协调框架保证数据一致性。
优点:同步、实时的方案,能直接获取远程调用状态,并进行相应的处理。
缺点:需要引入任务协调器,增加了系统复杂度。过于频繁的请求会对下游系统造成较大压力,需要额外的限流降级策略保证可用性。事务协调的过程需要按照框架定义的方式,通过大量的编码进行事件补偿处理。
方案二:通过事件驱动的方式,使用事件补偿/重试机制保证最终一致性。
优点:不需要依赖任务协调器,只需通过MQ即可完成间接的服务调用,通过事件重试和死信队列保证最终一致性。借助MQ的特性可以有效的缓解下游压力。
缺点:异步的方式,调用者无法第一时间获取调用结果,需要通过进一步的事件通知完成调用结果的反馈。编码量和方案一相当。
从系统设计角度看:方案一虽然更加实时准确,但是需要服务消费者(调用方)了解自己影响的下游业务,无形中增加了系统的耦合程度;方案二需要事件源(服务调用方)定义自己产生的事件,服务提供者订阅相应的事件进行处理,根据需要返回处理结果。
综合考虑,事件驱动的方式更符合我们系统的现状。
依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
名词解释
binding:消息源的绑定,定义消息源(队列)、使用方式(发布/订阅)等信息。
binder:binding的载体,即具体的消息队列应用程序。
配置
在入口类上需要配置@EnableBinding
启用Spring Cloud Stream配置,参数为binding的接口定义,支持以数组形式传入多个binding定义。
package com.example.demo;
import com.example.demo.binding.DemoBinding;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@SpringBootApplication
@EnableSwagger2
@EnableDiscoveryClient
@EnableBinding(DemoBinding.class)
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
在application.yml
中,需要配置RabbitMQ的相关信息。
spring:
cloud:
stream:
binders: #定义MQ地址
rabbit-binder: #MQ的标识
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin
bindings: #定义绑定(消息发布/订阅)
foo-in: #binding key,代码中会用到
destination: foo.event #绑定的队列名
group: ${spring.application.name} #分组
foo-out: #binding key,代码中会用到
destination: foo.event #绑定的队列名
default-binder: rabbit-binder #默认的MQ的标识
配置说明:
binders:定义MQ的地址,根据需要可能会定义多个MQ。
bindings:定义具体的事件队列,binding key需要在代码中使用。
group:同一个group中,只有一个实例可以获取消息,在消息订阅端需要配置为${spring.application.name}
,从而保证同一个服务的多个实例有且仅有一个会消费事件。
binder:存在多个MQ时,指定需要的MQ标识。
default-binder:默认使用的MQ配置。
为了示例方便,将生产者(foo-out)和消费者(foo-in)的binding配置到一起,实际环境中生产者和消费者是分开配置的。
Binding定义
一般来讲,针对每一个有消息交互的外部系统,会定义一个单独的binding。
package com.example.demo.binding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface DemoBinding {
String INPUT = "foo-in"; // 定义常量方便使用
String OUTPUT = "foo-out"; // 定义常量方便使用
@Input(INPUT) // 表示消息订阅,binding key为“foo-in”
SubscribableChannel fooIn();
@Output(OUTPUT) // 表示消息发布,binding key为“foo-out”
MessageChannel fooOut();
}
当binding定义为@Input
时,返回值有两种,分别代表两种模式:
SubscribableChannel
:消息监听模式,即被动接受新消息
PollableChannel
:消息拉取模式,即主动(定时)获取新消息
binding中的方法名没有特殊作用,但是为方便使用一般和binding key一致。
事件定义
事件的定义是灵活的,根据需要扩展的。一般来说有两种模式:
使用不同的队列表示不同的业务事件,如:foo.created.event表示Foo的新增,foo.updated.event表示Foo的修改,等等。此时可以直接发送变更的业务数据。
以业务领域对象划分队列,如:foo.event表示Foo的新增/修改/删除等操作。此时需要定义不同的事件类以便区分不同的事件,业务数据作为payload封装在事件中。
下面展示采用第二种方案的,一种可行的事件定义思路:
接口
定义基本的约束:需要携带数据,有事件类型。
package com.example.demo.event;
public interface Event {
/**
* 事件携带的数据
*/
Object getPayload();
/**
* 事件类型
*/
String getEventType();
}
抽象类
实现基本的框架:承载的数据为(单条)FooDto。
package com.example.demo.event;
import com.example.demo.dto.FooDto;
public abstract class AbstractFooEvent implements Event {
private final FooDto fooDto;
public FooEvent(FooDto fooDto) {
this.fooDto = fooDto;
}
@Override
public FooDto getPayload() {
return fooDto;
}
}
实现类
最终的事件定义,其中事件类型在实际使用时多为常量或枚举值。
package com.example.demo.event;
import com.example.demo.dto.FooDto;
public class FooUpdatedEvent extends AbstractFooEvent {
public FooUpdatedEvent(FooDto fooDto) {
super(fooDto);
}
@Override
public String getEventType() {
return "foo-updated";
}
}
消息发布
在事件源(消息发送者的业务代码)中,需要调用binding发送事件。
package com.example.demo.service;
import com.example.demo.*;
import com.taocares.commons.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Service;
@Service
public class FooService {
@Autowired
private FooRepository fooRepository;
@Autowired
private DemoBinding binding;
public void updateFoo(FooDto fooDto) {
Foo foo = BeanUtils.copyProperties(fooDto, Foo.class);
fooRepository.save(foo);
binding.fooOut().send(new GenericMessage<>(new FooUpdatedEvent(fooDto)));
}
}
消息订阅
消息订阅者需要定义MessageHandler进行事件的处理,使用@StreamListener
注解指定方法对应的binding key。
package com.example.demo.handler;
import com.example.demo.binding.DemoBinding;
import com.example.demo.event.AbstractFooEvent;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;
@Component
public class FooMessageHandler {
@StreamListener(DemoBinding.INPUT)
public void handleFooEvent(Object message) {
AbstractFooEvent fooEvent = (AbstractFooEvent) message;
if ("foo-updated".equals(fooEvent.getEventType())) {
FooDto fooDto = fooEvent.getPayload();
// do something ...
} else {
// do something else ...
}
}
}
这种简单订阅-处理模式下,消息处理方法不允许有返回值。
消息回执
一般来说,消息订阅者收到消息以后进行处理即可,无需返回值。某些业务场景下,消息发布者可能希望知道消息订阅者的处理情况,此时双方需要约定新的“回执队列”,使用@SendTo
注解指定回执队列的binding key,并且将回执内容返回即可。
package com.example.demo.handler;
import com.example.demo.*;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
@Component
public class FooMessageHandler {
@StreamListener(DemoBinding.INPUT)
@SendTo(DemoBinding.PROCESSED)
public FooProcessedEvent handleFooEventWithAck(Object message) {
AbstractFooEvent fooEvent = (AbstractFooEvent) message;
if ("foo-updated".equals(fooEvent.getEventType())) {
FooDto fooDto = fooEvent.getPayload();
return new FooProcessedEvent(fooDto.getName() + " has been processed");
}
return null;
}
}
此时返回值即为消息内容,无需像发送端一样手动构造Message
对象。
参考文档
关于Spring Cloud Stream技术的更多内容请参考官方文档:https://cloud.spring.io/spring-cloud-static/Finchley.SR1/multi/multi__spring_cloud_stream.html