事件驱动

当需要跨多个模块进行数据修改时,有两种常用的方案:

  • 方案一:通过远程服务调用的方式,使用分布式事务协调框架保证数据一致性。

    • 优点:同步、实时的方案,能直接获取远程调用状态,并进行相应的处理。

    • 缺点:需要引入任务协调器,增加了系统复杂度。过于频繁的请求会对下游系统造成较大压力,需要额外的限流降级策略保证可用性。事务协调的过程需要按照框架定义的方式,通过大量的编码进行事件补偿处理。

  • 方案二:通过事件驱动的方式,使用事件补偿/重试机制保证最终一致性。

    • 优点:不需要依赖任务协调器,只需通过MQ即可完成间接的服务调用,通过事件重试和死信队列保证最终一致性。借助MQ的特性可以有效的缓解下游压力。

    • 缺点:异步的方式,调用者无法第一时间获取调用结果,需要通过进一步的事件通知完成调用结果的反馈。编码量和方案一相当。

从系统设计角度看:方案一虽然更加实时准确,但是需要服务消费者(调用方)了解自己影响的下游业务,无形中增加了系统的耦合程度;方案二需要事件源(服务调用方)定义自己产生的事件,服务提供者订阅相应的事件进行处理,根据需要返回处理结果。

综合考虑,事件驱动的方式更符合我们系统的现状。

依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

名词解释

  • binding:消息源的绑定,定义消息源(队列)、使用方式(发布/订阅)等信息。

  • binder:binding的载体,即具体的消息队列应用程序。

  • payload:被事件承载的实际业务数据。

配置

在入口类上需要配置@EnableBinding启用Spring Cloud Stream配置,参数为binding的接口定义,支持以数组形式传入多个binding定义。

DemoApplication.java
package com.example.demo;

import com.example.demo.binding.DemoBinding;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@SpringBootApplication
@EnableSwagger2
@EnableDiscoveryClient
@EnableBinding(DemoBinding.class)
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}

application.yml中,需要配置RabbitMQ的相关信息。

application.yml
spring:
  cloud:
    stream:
      binders: #定义MQ地址
        rabbit-binder: #MQ的标识
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: admin
                password: admin
      bindings: #定义绑定(消息发布/订阅)
        foo-in: #binding key,代码中会用到
          destination: foo.event #绑定的队列名
          group: ${spring.application.name} #分组
        foo-out: #binding key,代码中会用到
          destination: foo.event  #绑定的队列名
      default-binder: rabbit-binder #默认的MQ的标识

Binding定义

一般来讲,针对每一个有消息交互的外部系统,会定义一个单独的binding。

DemoBinding.java
package com.example.demo.binding;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface DemoBinding {

    String INPUT = "foo-in"; // 定义常量方便使用
    String OUTPUT = "foo-out"; // 定义常量方便使用

    @Input(INPUT)  // 表示消息订阅,binding key为“foo-in”
    SubscribableChannel fooIn();

    @Output(OUTPUT) // 表示消息发布,binding key为“foo-out”
    MessageChannel fooOut();
}

事件定义

事件的定义是灵活的,根据需要扩展的。一般来说有两种模式:

  • 使用不同的队列表示不同的业务事件,如:foo.created.event表示Foo的新增,foo.updated.event表示Foo的修改,等等。此时可以直接发送变更的业务数据。

  • 以业务领域对象划分队列,如:foo.event表示Foo的新增/修改/删除等操作。此时需要定义不同的事件类以便区分不同的事件,业务数据作为payload封装在事件中。

下面展示采用第二种方案的,一种可行的事件定义思路:

接口

定义基本的约束:需要携带数据,有事件类型。

Event.java
package com.example.demo.event;

public interface Event {

    /**
     * 事件携带的数据
     */
    Object getPayload();

    /**
     * 事件类型
     */
    String getEventType();
}

抽象类

实现基本的框架:承载的数据为(单条)FooDto。

AbstractFooEvent.java
package com.example.demo.event;

import com.example.demo.dto.FooDto;

public abstract class AbstractFooEvent implements Event {
    private final FooDto fooDto;

    public FooEvent(FooDto fooDto) {
        this.fooDto = fooDto;
    }

    @Override
    public FooDto getPayload() {
        return fooDto;
    }
}

实现类

最终的事件定义,其中事件类型在实际使用时多为常量或枚举值。

FooUpdatedEvent.java
package com.example.demo.event;

import com.example.demo.dto.FooDto;

public class FooUpdatedEvent extends AbstractFooEvent {

    public FooUpdatedEvent(FooDto fooDto) {
        super(fooDto);
    }

    @Override
    public String getEventType() {
        return "foo-updated";
    }
}

消息发布

在事件源(消息发送者的业务代码)中,需要调用binding发送事件。

FooService.java
package com.example.demo.service;

import com.example.demo.*;
import com.taocares.commons.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Service;

@Service
public class FooService {

    @Autowired
    private FooRepository fooRepository;

    @Autowired
    private DemoBinding binding;

    public void updateFoo(FooDto fooDto) {
        Foo foo = BeanUtils.copyProperties(fooDto, Foo.class);
        fooRepository.save(foo);
        binding.fooOut().send(new GenericMessage<>(new FooUpdatedEvent(fooDto)));
    }
}

消息订阅

消息订阅者需要定义MessageHandler进行事件的处理,使用@StreamListener注解指定方法对应的binding key。

FooMessageHandler.java
package com.example.demo.handler;

import com.example.demo.binding.DemoBinding;
import com.example.demo.event.AbstractFooEvent;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;

@Component
public class FooMessageHandler {

    @StreamListener(DemoBinding.INPUT)
    public void handleFooEvent(Object message) {
        AbstractFooEvent fooEvent = (AbstractFooEvent) message;
        if ("foo-updated".equals(fooEvent.getEventType())) {
            FooDto fooDto = fooEvent.getPayload();
            // do something ...
        } else {
            // do something else ...
        }
    }
}

消息回执

一般来说,消息订阅者收到消息以后进行处理即可,无需返回值。某些业务场景下,消息发布者可能希望知道消息订阅者的处理情况,此时双方需要约定新的“回执队列”,使用@SendTo注解指定回执队列的binding key,并且将回执内容返回即可。

FooMessageHandler.java
package com.example.demo.handler;

import com.example.demo.*;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

@Component
public class FooMessageHandler {

    @StreamListener(DemoBinding.INPUT)
    @SendTo(DemoBinding.PROCESSED)
    public FooProcessedEvent handleFooEventWithAck(Object message) {
        AbstractFooEvent fooEvent = (AbstractFooEvent) message;
        if ("foo-updated".equals(fooEvent.getEventType())) {
            FooDto fooDto = fooEvent.getPayload();
            return new FooProcessedEvent(fooDto.getName() + " has been processed");
        }
        return null;
    }
}

参考文档

关于Spring Cloud Stream技术的更多内容请参考官方文档:https://cloud.spring.io/spring-cloud-static/Finchley.SR1/multi/multi__spring_cloud_stream.html

Last updated