SpringCloud:消息驱动-Stream

1.概述

1.1 为什么引入cloud stream?解决的痛点是什么?

市面上存在着多种消息中间件技术ActiveMQ,RabbitMQ,RocketMQ,Kafka,当需要系统进行整合时,或者系统进行切换时由于用的是不同的中间件技术,该怎么整合切换。

引出了SpringCloud Stream
屏蔽底层的细节差异,让我只需要操作一个Cloud Stream,就可以操作底层下面各种各样不同的MQ。达到我们以更小的代价实现切换,维护,开发。

1.2什么是SpringCloudStream?

一个构建消息驱动微服务的框架。

应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。
通过我们配置来binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。
所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。

目前仅支持RabbitMQ、Kafka。

一句话:SpringCloud Stream 屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。

官网  文档  中文文档

版本要求

SpringCloud:消息驱动-Stream

绑定器对象:Binder Implementations
就是靠它屏蔽了我们底层的MQ的差异

1.31.3 设计思想

1.3.1 传统的消息中间件的流程

SpringCloud:消息驱动-Stream

1.3.2 为什么用Cloud Stream

比方说我们同时用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,整合和切换就会有很大的成本。像RabbitMQ有exchange,kafka有Topic和Partitions分区。

SpringCloud:消息驱动-Stream

这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。

1.3.3 stream是怎么统一底层差异的?

SpringCloud:消息驱动-Stream

1.3.4 Binder

SpringCloud:消息驱动-Stream

通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,input对应于消费者(消费者从Stream接收消息),output对应于生产者(生产者从Stream发布消息)。

Stream中的消息通信方式遵循了发布-订阅模式。Topic主题进行广播:在RabbitMQ就是Exchange;在Kakfa中就是Topic

1.4 Spring Cloud Stream标准流程套路

SpringCloud:消息驱动-Stream

  • Binder:很方便的连接中间件,屏蔽差异(用于连接中间件与生产/消费者)
  • Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
  • Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出(output),接受消息就是输入(input)。(简单的理解为输出/输如)

1.5 编码API和常用注解

SpringCloud:消息驱动-Stream

SpringCloud:消息驱动-Stream

2.入门案例

2.1案例说明

RabbitMQ环境已经OK;
工程中新建三个子模块:

  • cloud-stream-rabbitmq-provider8801, 作为生产者进行发消息模块
  • cloud-stream-rabbitmq-consumer8802,作为消息接收模块
  • cloud-stream-rabbitmq-consumer8803 作为消息接收模块

2.2 消息驱动之生产者8801

2.2.1新建module

cloud-stream-rabbitmq-provider8801:作为生产者发送消息模块

2.2.2 pom

新增:

pom.xml

2.2.3 application.yml

application.yaml

2.2.4 主启动类

2.2.5 业务类

我们此时要写的代码,要注意是和MQ交互,而不是传统的controller调用service。

SpringCloud:消息驱动-Stream

我们现在写的代码是基于SpringCloudStream,然后做output指定通道,开启交互绑定器,再和中间件进行交互。

发送消息接口: MessageProvider

发送消息接口实现类:MessageProviderImpl

1.Source 定义消息的发送管道:

SpringCloud:消息驱动-Stream

这个Source哪来的呢? 简单的可理解为参照对象是SpringCloudStream自身, 从Stream发出消息就是输出,接收消息就是输入。 这里我们可以理解为我们定义一个消息生产者的发送管道:消息源。

SpringCloud:消息驱动-Stream

SendMessageController

测试

启动7001 8801

SpringCloud:消息驱动-Stream

多次访问:http://localhost:8801/sendmessage

SpringCloud:消息驱动-Stream

SpringCloud:消息驱动-Stream

2.3 消息驱动之消费者8802

2.3.1 新建module

cloud-stream-rabbitmq-consumer8802

2.3.2 pom

pom.xml

2.3.3 application.yml

8801是生产者是output,8802是消费者是input

application.yml

2.3.4 主启动类

2.3.5 业务类

ReceiveMessageListener

2.3.6 测试

启动7001、8801、8802

SpringCloud:消息驱动-Stream

SpringCloud:消息驱动-Stream

三、高级特性:分组消费与持久化

3.1 依照8802,clone出一份cloud-stream-rabbitmq-consumer8803

端口改好

启动测试

启动 7001,8801,8802,8803

刷新:http://localhost:8801/sendmessage  8802,8803都可接收消息

此时studyexchange有两个订阅者:8802、8803

SpringCloud:消息驱动-Stream

3.2 运行后的问题1:重复消费问题

目前8801发送一条消息后,8802和8803会同时收到8801的消息,存在重复消费问题。

3.3.1 为什么要解决重复消费问题

比如8801下一个订单,但是被两个服务获取消费,会多扣一次款。

SpringCloud:消息驱动-Stream

默认分组:流水号

SpringCloud:消息驱动-Stream

8802和8803默认是两个不同的分组,全面消费,每个组都消费这个消息

若在同一个组就是竞争关系,一条消息只能由其中一个微服务进行消费!

3.3.2 解决:消息分组

将 8802,8803 自定义配置分为同一个组,解决重复消费问题。

修改8802和8803的yml,都加上 group: tinStu1 

SpringCloud:消息驱动-Stream

测试:

刷新6次:http://localhost:8801/sendmessage

8802收到3次,8803收到三次

8802/8803实现了轮询分组,每次只有一个消费者接收消息,8801模块的发的消息只能被8802或8803其中一个接收到,这样避免了重复消费。

3.4 运行后的问题2:消息持久化问题

通过上述,解决了重复消费问题,再看看持久化

1.停止8802/8803并去除掉8802的分组group: tinStu1(8803的分组group: tinStu1没有去掉)

2.8801先发送4条消息到rabbitmq

3.先启动8802,无分组属性配置,后台没有打出来消息,发现8802没有收到消息,消息丢失。。。。

4.再启动8803,有分组属性配置(group: tinStu1),后台打印出来了MQ上的消息

正文完
 0