RocketMQ面试题
MQ和基本概念
ActiveMQ:java语言实现,万级数据吞吐量,处理速度ms级,主从架构,成熟度高
RabbitMQ:erlang语言实现,万级数据吞吐量,处理速度us级,主从架构,
RocketMQ:java语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能强大,扩展性强
kafka:scala语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能较少,应用于大数据较多
消息的生产者负责生产消息,生产出来的消息给broker(消息服务器).有了消息服务器,消息的生产者就可以以发送消息给消息服务器,同时返回去你的消息我收到了.消息的消费者负责消费消息.
常见的俩种模式:
- 消费者发送一个请求过去,消息服务器给消费者一个消息,消费者进行消费,这种模式比较少
- 监听器监听到消息服务器中是否有消息,如果有消息就直接推送给消息的消费者(就像滴滴,直接推送给司机,司机没有拒绝的权利,美团也是同样)
第二种模式角度,是主流的消息消费方式
生产者和消费者一般都是以集群的形式,集群可以让工作更加高效.不然仅有一台服务器挂了,就不能正常工作了.
消息集群的工作包含:消息的接收,消息的持久化,提供消息,过滤消息,高可用
生产者,消息服务器,和消费者如何找到对方
消息服务器注册到明明服务器上,这样命名服务器就有了所有消息服务器的Broker IP
消息的生产者在发送消息时,去连接命名服务器,同时获取所有的Broker信息,消息的消费者同理
命名服务器怎么才能知道这些服务器存在呢?万一有的机器挂了呢?通过心跳机制来维系,每30秒发送一个心跳包,来保障现在拿到的这个服务器是有效的
消息包括:消息Message、主题Topic(必备。是对消息进行分类的,这是订单类的,这是会员类的)、标题Tag。一个消息可以分成好多类,提取方式不同,处理的方式不同。服务器中会根据不同的Topic创建队列。对应的Topic主题的消息会放到对应的队列中
消费者发送
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.2</version>
</dependency>
</dependencies>
消息生产者
package com.base;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws Exception {
//创建发送消息对象
DefaultMQProducer producer = new DefaultMQProducer("group1");
//设定命名服务器地址---获取到消息服务器ip
producer.setNamesrvAddr("192.168.200.130:9876");
//启动发送服务
producer.start();
//构建消息,指定topic和body
Message msg = new Message("topic1", "hello base".getBytes());
//发送消息
SendResult sendResult = producer.send(msg, 10000);
System.out.println("sendResult = " + sendResult);
//关闭连接
producer.shutdown();
}
}
结果
sendResult = SendResult [sendStatus=SEND_OK, msgId=C0A8006F0FA018B4AAC27A51177E0000, offsetMsgId=C0A8C88200002A9F000000000001343E, messageQueue=MessageQueue [topic=tpoic1, brokerName=broker-a, queueId=3], queueOffset=0]
消费者
package com.base;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
//创建一个消息接收对象consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//设定接收消息的命名服务器地址---获取到消息服务器ip
consumer.setNamesrvAddr("192.168.200.130:9876");
//设置接收消息对应的topic,对应的sub标签为任意*,之前producer没有指定tag。如果producer发送的消息指定了tag,那么也必须指定相应的tag
consumer.subscribe("topic1", "*");
//开启监听,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//遍历接收到的消息
for (MessageExt msg : list) {
System.out.println("msg = " + msg);
System.out.println("消息为:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息接收服务
consumer.start();
}
}
msg = MessageExt [queueId=3, storeSize=157, queueOffset=2, sysFlag=0, bornTimestamp=1627132561754, bornHost=/192.168.200.1:10109, storeTimestamp=1627132561761, storeHost=/192.168.200.130:10911, msgId=C0A8C88200002A9F000000000001393A, commitLogOffset=80186, bodyCRC=806332349, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='topic1', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1627132561761, UNIQ_KEY=C0A8006FF2B418B4AAC27B0281590000, WAIT=true}, body=[104, 101, 108, 108, 111, 32, 109, 113], transactionId='null'}]
消息为:hello mq
rocketMQ有默认的负载均衡策略有俩种,一种是负载均衡,一种是平均分配
//设置消费者的消费模式:也是默认的模式负载均衡
//consumer.setMessageModel(MessageModel.CLUSTERING);
//设置消费者的消费模式为广播模式:所有客户端接收的消息都是一样的
consumer.setMessageModel(MessageModel.BROADCASTING);
消息发送
消息类型
同步消息
及时性较强,重要的消息,切必须有回执的消息,列入短信通知(转账成功)
异步消息
即时性较弱,但需要有回执的消息,例如订单中的某些信息
比如生成订单后,支付成功后返回支付成功的消息就可以,你支付完告诉我一声就行,并不需要立刻支付。
单向消息
不需要有回执的消息,例如日志类消息
代码
package com.messageType;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.TimeUnit;
public class Producer {
public static void main(String[] args) throws Exception {
//创建发送消息对象
DefaultMQProducer producer = new DefaultMQProducer("group1");
//设定命名服务器地址---获取到消息服务器ip
producer.setNamesrvAddr("192.168.200.130:9876");
//启动发送服务
producer.start();
//同步消息
/* for (int i = 1; i <= 10; i++) {
//构建消息,指定topic和body
Message msg = new Message("topic1", ("同步消息:hello"+i).getBytes());
//发送消息
SendResult sendResult = producer.send(msg);
System.out.println("sendResult = " + sendResult);
}*/
//异步消息
/* for (int i = 1; i <= 10; i++) {
//构建消息,指定topic和body
Message msg = new Message("topic1", ("异步消息:hello" + i).getBytes());
producer.send(msg, new SendCallback() {
//消息发送成功
public void onSuccess(SendResult sendResult) {
System.out.println("sendResult = " + sendResult);
}
//消息发送失败
public void onException(Throwable t) {
System.out.println("t = " + t);
}
});
}*/
//单向消息
//构建消息,指定topic和body
Message msg = new Message("topic1", ("单向消息:hello").getBytes());
producer.sendOneway(msg);
//程序休眠10秒钟,确保异步消息返回后能够输出
TimeUnit.SECONDS.sleep(10);
//关闭连接
producer.shutdown();
}
}
特殊消息
特殊消息发送
延时消息(和消息自身有关的特性)
消息生产者代码:
package com.delayMessage;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws Exception {
//创建发送消息对象
DefaultMQProducer producer = new DefaultMQProducer("group1");
//设定命名服务器地址---获取到消息服务器ip
producer.setNamesrvAddr("192.168.200.130:9876");
//启动发送服务
producer.start();
for (int i = 1; i <= 10; i++) {
//构建消息,指定topic和body
Message msg = new Message("topic1", ("延时消息:" + i).getBytes());
//设置延迟消息等级
msg.setDelayTimeLevel(3);
//发送消息
SendResult sendResult = producer.send(msg);
System.out.println("sendResult = " + sendResult);
}
//关闭连接
producer.shutdown();
}
}
关键代码
//构建消息,指定topic和body
Message msg = new Message("topic1", ("延时消息:" + i).getBytes());
//设置延迟消息等级
msg.setDelayTimeLevel(3);
//发送消息
SendResult sendResult = producer.send(msg);
System.out.println("sendResult = " + sendResult);
- 目前支持的消息时间
- 秒级:1,5,10,30
- 分级:1~10,20,30
- 时级:1,2
- 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2
所以上面的level水平对应就是30s。
批量消息
package com.mul;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
public class Producer {
public static void main(String[] args) throws Exception {
//创建发送消息对象
DefaultMQProducer producer = new DefaultMQProducer("group1");
//设定命名服务器地址---获取到消息服务器ip
producer.setNamesrvAddr("192.168.200.130:9876");
//启动发送服务
producer.start();
ArrayList<Message> messageList = new ArrayList<Message>();
//构建消息,指定topic和body
Message msg1 = new Message("topic1", ("批量消息:" + 1).getBytes());
Message msg2 = new Message("topic1", ("批量消息:" + 2).getBytes());
Message msg3 = new Message("topic1", ("批量消息:" + 3).getBytes());
Message msg4 = new Message("topic1", ("批量消息:" + 4).getBytes());
messageList.add(msg1);
messageList.add(msg2);
messageList.add(msg3);
messageList.add(msg4);
//发送消息
SendResult sendResult = producer.send(messageList);
System.out.println("sendResult = " + sendResult);
//关闭连接
producer.shutdown();
}
}
关键代码
ArrayList<Message> messageList = new ArrayList<Message>();
//构建消息,指定topic和body
Message msg1 = new Message("topic1", ("批量消息:" + 1).getBytes());
Message msg2 = new Message("topic1", ("批量消息:" + 2).getBytes());
Message msg3 = new Message("topic1", ("批量消息:" + 3).getBytes());
Message msg4 = new Message("topic1", ("批量消息:" + 4).getBytes());
messageList.add(msg1);
messageList.add(msg2);
messageList.add(msg3);
messageList.add(msg4);
//发送消息
SendResult sendResult = producer.send(messageList);
- 消息内容总长度不超过4M
- 消息内容总长度包含如下:
- topic(字符串字节数)
- body (字节数组长度)
- 消息追加的属性(key与value对应字符串字节数)
- 日志(固定20字节)
消息内容总长度超过4M就用不了了。
RocketMQ场景问题
rocketMQ-console是一款基于java环境开发的(springboot)的管理控制台工具
高级特性
持久化与持久化介质
activeMQ使用了数据库的消息存储
消息保存到数据库中,数据库会通过自己的逻辑保存到文件系统中,数据库将会成为MQ的瓶颈.早起的activeMQ使用的就是这种方式
文件系统存储
不用数据库,直接用文件系统
磁盘存储方式
复制完再删除,就会留下很多的磁盘碎片,这个时候,你去存储数据的时候,就会被打的很散,当数据存储的越满,磁盘碎片越多,存储数据的效率就会越低。
顺序写与零拷贝
ssd利用了顺序写来提高读写性能
使用了零拷贝
1) 通过启动时初始化话文件大小来保证 占用固定的磁盘空间,保证磁盘读写速度
2) 零拷贝”技术
数据传输由传统的4次复制简化成3次复制(如下图),减少1次复制过程
Java语言中使用MappedByteBuffer类实现了该技术
要求:预留存储空间,用于保存数据(1G存储空间起步)
mmap,三次拷贝
消息存储结构
消息数据存储区域
topic
queueId
message
消费逻辑队列
minOffset
maxOffset
consumerOffset
索引
key索引
创建时间索引
……
刷盘机制
同步刷盘
1)生产者发送消息到MQ,MQ接到消息数据
2)MQ挂起生产者发送消息的线程
3)MQ将消息数据写入内存
4)内存数据写入硬盘
5)磁盘存储后返回SUCCESS
6)MQ恢复挂起的生产者线程
7)发送ACK到生产者
异步刷盘
1)生产者发送消息到MQ,MQ接到消息数据
2)MQ将消息数据写入内存
3)发送ACK到生产者
–等消息量多了–
4)内存数据写入硬盘
同步刷盘:安全性高,效率低,速度慢(适用于对数据安全要求较高的业务)
异步刷盘:安全性低,效率高,速度快(适用于对数据处理速度要求较高的业务)
配置方式
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
高可用方案与主从复制
高可用方案
RcoketMQ分为四个部分
nameServer
MQ的设计一般都是基于主题的发布订阅机制,producer发送某一主题的消息到 消息服务器,消息服务器负责该消息的持久化存储,Consumer订阅了该主题,消息服务器根据订阅信息(路由信息)将消息推送到Consumer,或者Consumer主动向消息服务器拉取消息,从而实现producer和consumer之间的解耦。
为了避免消息服务器单点故障,通常会部署多台消息服务器,那producer如何知道msg要发往哪台消息服务器呢?如果某一台消息服务器宕机了,producer如何在不重启的情况下感知呢?
nameserver就是为了解决以上问题设计的.broker(消息服务器)在启动的时候向所有的nameserver注册,producer在消息发送之前先从nameserver获取broker服务器地址列表,然后根据负载均衡算法从列表中选择一台broker进行msg发送
Broker:Broker主要负责消息的存储、投递和查询以及服务高可用保证。
Producer:消息的生产者。
Consumer:消息的消费者。
nameServer启动过程
启动类为NameServerStartup
step1:解析配置文件,需要填充NameServerConfig,NettyServerConfig属性值
step2:根据启动属性创建NamesrvController,并初始化该实例NameServerController实例为NameServer核心控制器.加载KV配置,创建NettyServer网络处理对象,然后开俩个定时任务,RocketMQ中此类定时任务的作用就是心跳
1.检测broker是否存活 2,打印KV配置信息
step3:注册JVM钩子函数并启动服务器,以便监听Broker,Producer的网络请求
特别的,RocketMQ设计之初时参考的另一款消息中间件Kafka就使用了Zookeeper,Zookeeper其提供了Master选举、分布式锁、数据的发布和订阅等诸多功能。
事实上,在RocketMQ的早期版本,即MetaQ 1.x和MetaQ 2.x阶段,也是依赖Zookeeper的。但MetaQ 3.x(即RocketMQ)却去掉了ZooKeeper依赖,转而采用自己的NameServer。
rocketmq的架构设计决定了只需要一个轻量级的元数据服务器就够了,只需要保证最终一致,而不需要zk这样的强一致性方案,不需要再依赖另一个中间件,从而减少整体维护成本.根据cap理论,rocketMQ在这个模块的设计上选择了ap而不是cp
nameserver如何保证最终一致性
路由剔除
正常情况下,如果Broker关闭,则会与NameServer断开长连接,Netty的通道关闭监听器会监听到连接断开事件,然后会将这个Broker信息剔除掉。
异常情况下,NameServer中有一个定时任务,每隔10秒扫描一下Broker表,如果某个Broker的心跳包最新时间戳距离当前时间超多120秒,也会判定Broker失效并将其移除。
特别的,对于一些日常运维工作,例如:Broker升级,RocketMQ提供了一种优雅剔除路由信息的方式。如在升级一个节Master点之前,可以先通过命令行工具禁止这个Broker的写权限,发送消息到这个Broker的请求,都会收到一个NO_PERMISSION响应,客户端会自动重试其他的Broker。
当观察到这个broker没有流量后,再将这个broker移除。
路由发现
路由发信啊是客户端的行为,这里的客户端主要说的是生产者和消费者.具体来说
对于生产者,可以发送消息到多个topic,因此一般是在发送第一条消息时,才会根据topic获取NameServer获取路由信息
对于消费者,订阅的topic一般是固定的,所以在启动时就会拉取
那么生产者/消费者在工作的过程中,如果路由信息发生了变化怎么处理呢?如:Broker集群新增了节点,节点宕机或者Queue的数量发生了变化。细心的读者注意到,前面讲解NameServer在路由注册或者路由剔除过程中,并不会主动推送会客户端的,这意味着,需要由客户端拉取主题的最新路由信息。
事实上,RocketMQ客户端提供了定时拉取Topic最新路由信息的机制,这里我们直接结合源码来讲解。
DefaultMQProducer和DefaultMQConsumer有一个pollNameServerInterval配置项,用于定时从NameServer并获取最新的路由表,默认是30秒,它们底层都依赖一个MQClientInstance类。
MQClientInstance类中有一个updateTopicRouteInfoFromNameServer方法,用于根据指定的拉取时间间隔,周期性的的从NameServer拉取路由信息。 在拉取时,会把当前启动的Producer和Consumer需要使用到的Topic列表放到一个集合中,逐个从NameServer进行更新。以下源码展示了这个过程:
public void updateTopicRouteInfoFromNameServer() {
//1 需要更新路由信息的Topic集合
Set<String> topicList = new HashSet<String>();
//2 添加消费者需要使用到的Topic到集合中
{
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
Set<SubscriptionData> subList = impl.subscriptions();
if (subList != null) {
for (SubscriptionData subData : subList) {
topicList.add(subData.getTopic());
}
}
}
}
}
//3 添加生产者需要使用到的topic到集合中
{
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
Set<String> lst = impl.getPublishTopicList();
topicList.addAll(lst);
}
}
}
//4 逐一从NameServer更新每个Topic的路由信息
for (String topic : topicList) {
this.updateTopicRouteInfoFromNameServer(topic);
}
}
然而定时拉取,还不能解决所有的问题。因为客户端默认是每隔30秒会定时请求NameServer并获取最新的路由表,意味着客户端获取路由信息总是会有30秒的延时。这就带来一个严重的问题,客户端无法实时感知Broker服务器的宕机。如果生产者和消费者在这30秒内,依然会向这个宕机的broker发送或消费消息呢?
这个问题,可以通过客户端重试机制来解决。
生产者重试机制
在讲解重试机制之前,我们必须先对三种消息类型:普通消息,普通有序消息,严格有序消息进行介绍.因为rocketMQ客户端的生产者重试机制,只会普通消息有作用.严格消息是没有作用.
- 普通消息:消息是无序的,任意发送哪一个队列都可以
- 普通有序消息:用一个消息(列入某个用户的消息)总是发送道同一个队列,在异常情况下,也可以发送到其他队列
- 严格有序消息:消息必须被发送到同一个队列,在异常情况下,也不允许发送到其他队列
对于这三种类型的消息,RocketMQ分别提供了对应的方法来发送消息,例如同步发送(异步/批量/oneway也是类似):
高可用拓扑
出现问题
A主挂,A主停止写入服务,consumer到A从拉取积压数据,slave-a不会升级为master-a过程中如果设置异步复制可能会丢数据
A主挂,producer重试发送到b主,120秒后从nameServer摘除(30秒上报一次注册信息)
a主a从都挂,a从积压数据无法获取,但不会丢失a从回复服务后,consumer继续消费
消费者端只要主和从有一个可用,就不会受影响
nameserver节点挂了,producer/consumer从客户端nameServerList轮询有效nameserver节点获取broker信息
极端情况
a主a从如果同时对nameserver2网络短线,导致俩个nameserver状态不一致,如果consumer优先注册的是nameserver1,那么在consumer眼中broker-a是不存在的,但是生产者还是可以继续投递消息,这就会导致brokera的消息积压,这种情况只能通过恢复brokera和nameserver2的通信来解决,或者修改consumer的配置
主从复制方案
同步复制
master接到消息后,先复制到slave,然后反馈给生产者写操作成功
优点:数据安全,不丢数据,出现故障容易恢复
缺点:影响数据吞吐量,整体性能低
异步复制
master接到消息后,立即返回给生产者写操作成功,当消息达到一定量后再异步复制到slave
优点:数据吞吐量大,操作延迟低,性能高
缺点:数据不安全,会出现数据丢失的现象,一旦master出现故障,从上次数据同步到故障时间的数据将丢失
常见问题
死信队列
概念
当消息消费重试达到了指定次数后(默认16)后,MQ将无法被正常消费的消息称为死信队列
死信消息不会被直接抛弃,而是保存到了一个全新的队列中,该队列称为死信队列(Dead-letter queue)
特征
- 归属某一个组(Gourp Id),而不归属Topic,也不归属消费者
- 一个死信队列中可以包含同一个组下的多个Topic中的死信消息
- 死信队列不会进行默认初始化,当第一个死信出现后,此队列首次初始化
在监控平台中,通过查找死信,获取死信的messageId,然后通过id对死信进行精准消费
重复消费
MQ要保证消息投递的可靠性,对未ack的消息,会重复投递。因此消费者端要自己保证消费的幂等性,方法如:消费者收到消息后,从消息中获取消息标识写入到Redis或数据库,当再次收到该消息时就不作处理。消息重复投递的场景,除重试外,很大一部分来自于负载均衡阶段,前一个监听Queue的消费实例拉取的消息未全部ack,新的消费实例监听到这个Queue重新拉取消息。
强校验
场景:如与金钱相关的支付等关键消息,必须强校验。
基于数据库的唯一键来保证重复数据不会被插入多条。建立一个已消费消息的表,每次消费之前检查消费表中当前消费的消息是否已经存在,若存在表示消息已经被消费过直接返回。
弱校验
场景:可以有小概率出现重复消费的非关键消息
- 使用set结构实现。每次消费前查看set中是否已经存在待消费的消息的唯一标识符,不存在则消息,存在则直接返回。
- 场景唯一标识+id作为Redis的key,并设置一定的过期时间。每次消费时检查key是否已经存在,存在则直接返回
顺序消费
rocket提供俩种顺序消费模式
普通顺序消息
普通顺序消费模式下,消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。普通顺序消息在 Broker 重启情况下不会保证消息顺序性 (短暂时间) 。
严格顺序消息
严格顺序消息模式下,对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。严格顺序消息 即使在异常情况下也会保证消息的顺序性 。
严格顺序虽然能更好的保证消息有序,但实现它可会付出巨大的代价。如果你使用严格顺序模式,Broker 集群中只要有一台机器不可用,则整个集群都不可用。
一般而言,我们的 MQ 都是能容忍短暂的乱序,所以推荐使用普通顺序模式。
消息的有序性
消息有序是一类消息消费时,能按照发送的顺序来消费,例如:一个订单产生了三条消息分别是订单的创建,订单付款,订单完成.消费时要按照这个顺序消费才有意义,但是订单之间是可以并行消费的.
rocketMQ可以严格保证消息有序
顺序消息分为全局顺序消息与部分顺序消息,全局顺序消息是指某个toipic下的所有消息都要保证顺序,部分顺序消息只要保证每一组的消息被顺序消费即可
如果要实现全局顺序消息,那么只能使用一个队列,以及单个生产者,这是会严重影响性能
因此,我们常说的顺序消息通常是只的部分顺序消息,就上面的例子来说,我们不用管不同的订单ID的消息之间的总体消费顺序,只需要保证同样订单ID的消息能按照订单创建、订单付款、订单完成这个顺序消费就可以了。
生产者有序发送
先看如何实现生产者有序存储.我们知道rocketmq中生产者生产的消息会放置在某个队列中,基于度列先进先出特性天然可以保证入队列的消息顺序拉取的消息顺序是一致的,因此,我们只需要保证一组相同的消息按照给定的顺序存入一个队列中,就能保证生产者的有序存储
普通发送消息的模式下,生产者会采用轮询的方式将消费者均匀的分发到不同的队列中,然后被不同的消费者消费,因为一组消息在不同的队列,此时就无法使用rocketmq带来的队列特性来保证消息的有序性了
这个问题很好解决,因为rocketMQ支持生产者在投放消息的时候自定义投放策略,我们实现一个MessageQueueSelector接口,使用hash取摸法来保证同一个订单在同一个队列中就行了,即通过订单id%队列数量得到该id的订单锁投放的队列在队列表中的索引,然后该订单的所有消息都会被投放到这个队列中
生产者发送消息的方法中就有一些添加队列选择器的方法,保证消息发送顺序
比如只有俩个队列,那么订单ID为1,2,3的三组消息中,1,3组消息存放于第一个队列,而2组消息存放于第二个队列,下图是一种消息可能的消息存放顺序
根据上图可以,上面方法可以实现一组消息的顺序存放,不同组消息的顺序无法保证,这就是分区顺序
另外顺序消息必须使用同步发送的方式才能保证生产者的发送的消息有序
实际上采用队列选择器的方法不能保证消息的严格顺序,我们的目的是将消息发送到同一个队列中,如果某个broker挂了,那么队列就会减少一部分,如果采用取余的方式投递,可能导致同一个业务中的不同消息被发送到了不同的服务器中,短暂的造成部分消息无序.同样的如果增加了服务器,也会造成短暂的部分消息无序
消费者有序消费
生产者有序存储实现了,那么改如何实现消费者有序消费呢?rocketmq的MessageListener回调函数提供了俩种消费模式
- 有序消费模式,MessageListenerOrderly
- 并发消费模式MessageListenerConcurrently
在消费的时候,还需要保证消费者注册MessageListenerOrderly
类型的回调接口实现顺序消费,如果消费者采用Concurrently并行消费,则仍然不能保证消息消费顺序。
实际上每一个消费者的消费端都是采用线程池实现多线程消费的模式,即消费端是多线程消费.虽然MessageListenerOrderly被称为有序消费模式,但是仍然使用线程池去消费消息
MessageListenerConcurrently是拉取到新消息之后就提交到线程池去消费,而MessageListenerOrderly则是通过加分布式锁和本地所保证同时只有一条线程去消费队列上的数据
消费者模式使用3把锁来保证消费的顺序性
broker端的分布式锁:
在负载均衡的处理新分配队列的updateProcessQueueTableInRebalance方法,以及ConsumeMessageOrderlyService服务启动时的start方法中,都会尝试向broker申请当前消费者客户端分配到的messageQueue的分布式锁。
broker端的分布式锁存储结构为ConcurrentMap<String/ group /, ConcurrentHashMap<MessageQueue, LockEntry>>,该分布式锁保证同一个consumerGroup下同一个messageQueue只会被分配给一个consumerClient。
获取到的broker端的分布式锁,在client端的表现形式为processQueue. locked属性为true,且该分布式锁在broker端默认60s过期,而在client端默认30s过期,因此ConsumeMessageOrderlyService#start会启动一个定时任务,每过20s向broker申请分布式锁,刷新过期时间。而负载均衡服务也是每20s进行一次负载均衡。
broker端的分布式锁最先被获取到,如果没有获取到,那么在负载均衡的时候就不会创建processQueue了也不会提交对应的消费请求了。
messageQueue的本地synchronized锁:
在执行消费任务的开头,便会获取该messageQueue的本地锁对象objLock,它是一个Object对象,然后通过synchronized实现锁定。
这个锁的锁对象存储在MessageQueueLock.mqLockTable属性中,结构为ConcurrentMap<MessageQueue, Object>,所以说,一个MessageQueue对应一个锁,不同的MessageQueue有不同的锁。
因为顺序消费也是通过线程池消费的,所以这个synchronized锁用来保证同一时刻对于同一个队列只有一个线程去消费它。
ProcessQueue的本地consumeLock:
在获取到broker端的分布式锁以及messageQueue的本地synchronized锁的之后,在执行真正的消息消费的逻辑messageListener#consumeMessage之前,会获取ProcessQueue的consumeLock,这个本地锁是一个ReentrantLock。
那么这把锁有什么作用呢?
在负载均衡时,如果某个队列C被分配给了新的消费者,那么当前客户端消费者需要对该队列进行释放,它会调用removeUnnecessaryMessageQueue方法对该队列C请求broker端分布式锁的解锁。
而在请求broker分布式锁解锁的时候,一个重要的操作就是首先尝试获取这个messageQueue对应的ProcessQueue的本地consumeLock。只有获取了这个锁,才能尝试请求broker端对该messageQueue的分布式锁解锁。
如果consumeLock加锁失败,表示当前消息队列正在消息,不能解锁。那么本次就放弃解锁了,移除消息队列失败,只有等待下次重新分配消费队列时,再进行移除。
如果没有这把锁,假设该消息队列因为负载均衡而被分配给其他客户端B,但是由于客户端A正在对于拉取的一批消费消息进行消费,还没有提交消费点位,如果此时客户端A能够直接请求broker对该messageQueue解锁,这将导致客户端B获取该messageQueue的分布式锁,进而消费消息,而这些没有commit的消息将会发送重复消费。
所以说这把锁的作用,就是防止在消费消息的过程中,该消息队列因为发生负载均衡而被分配给其他客户端,进而导致的两个客户端重复消费消息的行为。
原文链接:https://blog.csdn.net/weixin_43767015/article/details/121028059