博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RocketMQ应用
阅读量:3922 次
发布时间:2019-05-23

本文共 2766 字,大约阅读时间需要 9 分钟。

springCloudStream整合rocketMQ

springCloudStream:统一的中间件框架,以一套代码实现mq产品方法的封装;
RabbitMQ,kafka都是由spring维护;
rocketMQ是由阿里巴巴产生自己维护;
跟springboot三板斧一样,依赖配置,启动类,还有配置文件。
pom依赖,
核心依赖:
com.alibaba.cloud
spring-cloud-starter-stream-rocketmq
2.2.3.RELEASE
org.apache.rocketmq
rocketmq-client
org.apache.rocketmq
rocketmq-acl
启动类
@EnableBinding({Source.class, Sink.class})
@SpringBootApplication
public class ScRocketMQApplication {

public static void main(String[] args) {    SpringApplication.run(ScRocketMQApplication.class,args);}

}

配置文件:
#ScStream通用的配置以spring.cloud.stream开头
spring.cloud.stream.bindings.input.destination=TestTopic
spring.cloud.stream.bindings.input.group=scGroup
spring.cloud.stream.bindings.output.destination=TestTopic
#rocketMQ的个性化配置以spring.cloud.stream.rocketmq开头
#spring.cloud.stream.rocketmq.binder.name-server=192.168.232.128:9876;192.168.232.129:9876;192.168.232.130:9876
spring.cloud.stream.rocketmq.binder.name-server=192.168.232.128:9876

input对应启动类中的Sink.class //生产者

output对应Source.class //发送者

spring.cloud.stream.bindings是这个框架独有的;

spring.cloud.stream.rocketmq是rocketmq独有的;

生产者代码:

@Resourceprivate Source source;public void sendMessage(String msg){
Map
headers = new HashMap<>(); headers.put(MessageConst.PROPERTY_TAGS, "testTag"); MessageHeaders messageHeaders = new MessageHeaders(headers); Message
message = MessageBuilder.createMessage(msg, messageHeaders); this.source.output().send(message);}

消费者

@Componentpublic class ScConsumer {
@StreamListener(Sink.INPUT) public void onMessage(String messsage) {
System.out.println("received message:" + messsage + " from binding:" + Sink.INPUT); }}

如果要换成其他mq,代码不用做其他任何改动;

把依赖换掉,把配置改掉就可以用了。非常强大。只要支持spring有支持就可以直接这样改以下就行了。
厂商维护的版本很落后,高级定制功能有很多未知的问题。文档也很少,目前不是很好的集成方案,但是要做必要的了解。目前最好的方案是springboot。
高可用
rocketMQ在之前不支持高可用,4.5.0才引入高可用的功能;是第三方的技术Dledger。
1、接管Broker的CommitLog消息存储
2、从集群中选举出master节点
3、完成master节点往slave节点的消息同步
Dledger怎么选举:集群中每个节点有三个状态
1、Leader:响应客户端中所有的请求,读和写;
2、Follower:如果有请求来这里,那么也会转给leader;
3、Candicdate:正常情况下只有leader和follower;如果集群中没有leader,那么所有节点都会变成Candicdate,比如我们刚刚启动机子的时候所有的节点都是Follower,但是没有leader,就会把所有节点变成Candicdate。他们去选举出leader。
选举机制:每个Candicdate记录一个TermID,谁的最大,谁就成为Leader。如果都一样,那么就会进行休眠一会儿。再进行选举,最多经过两三论就能选举成功。一个leader一般当一段时间,就会重新洗牌,无论有没有发生故障,他们没过一段时间就会重新选举一次。当然如果leader发生故障也会重新选举。
如何保证消息不丢失:
1、跨网络的地方,生产者到mq,mq到消费者,主从同步的时候,存盘,整个mq都挂了。
生产者到mq:事务消息;
刷盘:采用同步刷盘,不采用异步刷盘方式,配置以下即可;
主从:采用主从同步复制数据;Dledger文件同步同过两阶段同步,一个是未提交阶段,在未提交阶段向follower发起同步。超过半数节点返回同步成功,就修改成提交阶段。向客户端发送消息。
mq到消费者:如果没有收到消费者的响应就重复发数据。异步消息还是会存在消息丢失,比如已经接收到消息了,但是业务逻辑还没处理,也存在消息丢失。
保证消息有序
mq中需要注意全局有序和局部有序;
大部分业务只要保证局部有序即可。让一组消息放在一个队列中,放在一个队列中就能保证有序。每一组放一个队列中就保证了局部有序。
处理积压
队列一般都是定好的了,而且没办法动态扩容。把消息搬运到另一个mq上,并给这个mq添加大量的队列,创建新的消费者去这个后加队列中处理消息。

转载地址:http://wpqrn.baihongyu.com/

你可能感兴趣的文章
Dapr微服务应用开发系列1:环境配置
查看>>
使用 Visual Studio 2019 批量添加代码文件头
查看>>
【BCVP更新】StackExchange.Redis 的异步开发方式
查看>>
.NET5.0 Preview 8 开箱教程
查看>>
真・WPF 按钮拖动和调整大小
查看>>
做权限认证,还不了解IdentityServer4?不二话,赶紧拥抱吧,.NET Core官方推荐!...
查看>>
编写第一个 .NET 微服务
查看>>
深入探究.Net Core Configuration读取配置的优先级
查看>>
Blazor带我重玩前端(六)
查看>>
使用 C# 捕获进程输出
查看>>
数据库单表千万行 LIKE 搜索优化手记
查看>>
.NET Core 中生成验证码
查看>>
.NET Core 中导入导出Excel
查看>>
初识ABP vNext(8):ABP特征管理
查看>>
WPF 消息框 TextBox 绑定新数据时让光标和滚动条跳到最下面
查看>>
【BCVP】实现基于 Redis 的消息队列
查看>>
网络安全逐渐成为程序员的必备技能
查看>>
统信发布UOS V20 进军个人市场 生态日益完善
查看>>
BeetleX框架详解-小结
查看>>
拥抱.NET 5,从自研微服务框架开始
查看>>