本文共 2766 字,大约阅读时间需要 9 分钟。
springCloudStream整合rocketMQ
springCloudStream:统一的中间件框架,以一套代码实现mq产品方法的封装; RabbitMQ,kafka都是由spring维护; rocketMQ是由阿里巴巴产生自己维护; 跟springboot三板斧一样,依赖配置,启动类,还有配置文件。 pom依赖, 核心依赖: com.alibaba.cloud spring-cloud-starter-stream-rocketmq 2.2.3.RELEASE org.apache.rocketmq rocketmq-client org.apache.rocketmq rocketmq-acl 启动类 @EnableBinding({Source.class, Sink.class}) @SpringBootApplication public class ScRocketMQApplication {public static void main(String[] args) { SpringApplication.run(ScRocketMQApplication.class,args);}
}
配置文件: #ScStream通用的配置以spring.cloud.stream开头 spring.cloud.stream.bindings.input.destination=TestTopic spring.cloud.stream.bindings.input.group=scGroup spring.cloud.stream.bindings.output.destination=TestTopic #rocketMQ的个性化配置以spring.cloud.stream.rocketmq开头 #spring.cloud.stream.rocketmq.binder.name-server=192.168.232.128:9876;192.168.232.129:9876;192.168.232.130:9876 spring.cloud.stream.rocketmq.binder.name-server=192.168.232.128:9876input对应启动类中的Sink.class //生产者
output对应Source.class //发送者spring.cloud.stream.bindings是这个框架独有的;
spring.cloud.stream.rocketmq是rocketmq独有的;
生产者代码:
@Resourceprivate Source source;public void sendMessage(String msg){ Mapheaders = new HashMap<>(); headers.put(MessageConst.PROPERTY_TAGS, "testTag"); MessageHeaders messageHeaders = new MessageHeaders(headers); Message message = MessageBuilder.createMessage(msg, messageHeaders); this.source.output().send(message);}
消费者
@Componentpublic class ScConsumer { @StreamListener(Sink.INPUT) public void onMessage(String messsage) { System.out.println("received message:" + messsage + " from binding:" + Sink.INPUT); }}
如果要换成其他mq,代码不用做其他任何改动;
把依赖换掉,把配置改掉就可以用了。非常强大。只要支持spring有支持就可以直接这样改以下就行了。 厂商维护的版本很落后,高级定制功能有很多未知的问题。文档也很少,目前不是很好的集成方案,但是要做必要的了解。目前最好的方案是springboot。 高可用 rocketMQ在之前不支持高可用,4.5.0才引入高可用的功能;是第三方的技术Dledger。 1、接管Broker的CommitLog消息存储 2、从集群中选举出master节点 3、完成master节点往slave节点的消息同步 Dledger怎么选举:集群中每个节点有三个状态 1、Leader:响应客户端中所有的请求,读和写; 2、Follower:如果有请求来这里,那么也会转给leader; 3、Candicdate:正常情况下只有leader和follower;如果集群中没有leader,那么所有节点都会变成Candicdate,比如我们刚刚启动机子的时候所有的节点都是Follower,但是没有leader,就会把所有节点变成Candicdate。他们去选举出leader。 选举机制:每个Candicdate记录一个TermID,谁的最大,谁就成为Leader。如果都一样,那么就会进行休眠一会儿。再进行选举,最多经过两三论就能选举成功。一个leader一般当一段时间,就会重新洗牌,无论有没有发生故障,他们没过一段时间就会重新选举一次。当然如果leader发生故障也会重新选举。 如何保证消息不丢失: 1、跨网络的地方,生产者到mq,mq到消费者,主从同步的时候,存盘,整个mq都挂了。 生产者到mq:事务消息; 刷盘:采用同步刷盘,不采用异步刷盘方式,配置以下即可; 主从:采用主从同步复制数据;Dledger文件同步同过两阶段同步,一个是未提交阶段,在未提交阶段向follower发起同步。超过半数节点返回同步成功,就修改成提交阶段。向客户端发送消息。 mq到消费者:如果没有收到消费者的响应就重复发数据。异步消息还是会存在消息丢失,比如已经接收到消息了,但是业务逻辑还没处理,也存在消息丢失。 保证消息有序 mq中需要注意全局有序和局部有序; 大部分业务只要保证局部有序即可。让一组消息放在一个队列中,放在一个队列中就能保证有序。每一组放一个队列中就保证了局部有序。 处理积压 队列一般都是定好的了,而且没办法动态扩容。把消息搬运到另一个mq上,并给这个mq添加大量的队列,创建新的消费者去这个后加队列中处理消息。转载地址:http://wpqrn.baihongyu.com/