RocketMQ简介
简介
消息队列RocketMQ版是阿里云基于Apache RocketMQ构建的低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列RocketMQ版既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。
概念
- Topic:消息主题,以及消息类型,生产者向其发送消息
- 生产者:也称为消息发布者,负责1生产并发送消息至Topic
- 消费者:也称为消息订阅者,负责从Topic接收并消费消息.
- 消息:生产者想Topic发送并最终传递给消费者的数据和(可选)属性组合
- 消息属性:生产者可以为消息定义的属性,包含message key和Tag
- Group:一类生产者或消费者,这类生产者和消费者通常生产或消费同一类的消息,切消息发布或订阅的逻辑一致
消息收发模型
消息队列RocketMQ版支持发布和订阅模型,消息生产者应用创建Topic并将消息发送到 Topic.消费者应用创建对Topic的订阅以便从其接受消息.通信可以是一对多(扇出),多对一(扇入)和多对多
如图
- 生产集群:用来表示发送消息应用,一个生产集群下包含多个生产者实例,可以是多台机器,也可以是一台机器的多个进程,或者一个进程的多个对象.
- 消费者集群:用来表示消费消息应用,一个消费者集群下包含多个消费者实例,可以是多台机器,也可以是多个进程,或者是一个进程的多个消费者对象.
一个消费者集群下的多个消费者均摊方式消费消息.如果设置的是广播方式,那么这个消费者集群下的每个实例都会消费全量数据.
一个消费者对应一个group ID,一个Group ID可以订阅多个Topic,如上图group和tpic的订阅关系可以通过直接在程序中设置即可,具体设置方法可以参见产品更新日志中的资源申请流程优化部分
应用场景
- 削峰填谷
诸如秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,消息队列RocketMQ版可提供削峰填谷的服务来解决该问题。
- 异步解耦
交易系统作为淘宝和天猫主站最核心的系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、流计算分析等等,整体业务系统庞大而且复杂,消息队列RocketMQ版可实现异步通信和应用解耦,确保主站业务的连续性。
- 顺序收发
细数日常中需要保证顺序的应用场景非常多,例如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等等。与先进先出FIFO(First In First Out)原理类似,消息队列RocketMQ版提供的顺序消息即保证消息FIFO。
- 分布式事务一致性
交易系统、支付红包等场景需要确保数据的最终一致性,大量引入消息队列RocketMQ版的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。
- 大数据分析
数据在“流动”中产生价值,传统数据分析大多是基于批量计算模型,而无法做到实时的数据分析,利用阿里云消息队列RocketMQ版与流式计算引擎相结合,可以很方便的实现业务数据的实时分析。
- 分布式缓存同步
天猫双11大促,各个分会场琳琅满目的商品需要实时感知价格变化,大量并发访问数据库导致会场页面响应时间长,集中式缓存因带宽瓶颈,限制了商品变更的访问流量,通过消息队列RocketMQ版构建分布式缓存,实时通知商品数据的变化。
功能特性概述
概览
消息队列RocketMQ班在阿里云多个地域(Region)提供了高可用的消息云服务.单个地域内采用多个机房部署,可用性极高,即时整个机房都不可用,任然可以为应用提供消息发布服务
消息队列RocketMq班提供TCP和http协议的多语言接入方式,方便不同编程语言开发的应用快速接入消息队列RocketMQ版消息云服务。您可以将应用部署在阿里云ECS、企业自建云,或者嵌入到移动端、物联网设备中与消息队列RocketMQ版建立连接进行消息收发;同时,本地开发者也可以通过公网接入消息队列RocketMQ版服务进行消息收发。
多协议接入
- TCP协议:区别于HTTP简单的接入方式,提供更为专业、可靠、稳定的TCP协议的SDK接入服务。支持的语言包括Java、C/C++ 以及.NET。
- HTTP协议:采用RESTful风格,方便易用,快速接入,跨网络能力强。支持Java、C++、.NET、Go、Python、Node.js和PHP七种语言客户端。
管理工具
- web控制台:支持Topic管理,group管理,消息查询,消息轨迹展示和查询,资源报表 以及监控报警管理.
- penAPI提供开放的API便于讲消息队列
消息类型
- 普通消息:消息队列RocketMQ版中无特性消息,区别于特性的定时和延时消息、顺序消息和事务消息。
- 事务消息:实现类似X/Open XA的分布事务功能,已达到事务最终一致性状态
- 定时和延时消息:允许消息生产者对指定消息进行定时(延时)投递,最长支持40天
- 顺序消息:允许消息消费者按照消息发送的顺序对消息进行消费
消息特性
- 消息重试:在消费者返回重试的响应后,消息队列RocketMQ版会按照响应的规则进行消息重投.
- 至少投递一次:消息队列RocketMQ版保证消息成功被消费一次.消息对垒RocketMQ版的分布式特点和瞬变的网络条件,或者在用户重启发布的情况下,可能导致消费者无法收到重复的消息.开发人员应将应用程序设计为多次处理一条消息不会产生任何错误不一致性.消息幂等最佳实践请参见消费幂等。
特性功能
- 消息查询:消息队列RocketMQ版提供了三种消息查询的方式,分别是按Message ID、Message Key以及Topic查询。
- 查询消息轨迹:通过消息轨迹,能清晰定位消息从生产者发出,经由消息队列RocketMQ版服务端,投递给消息消费者的完整链路,方便定位排查问题。
- 集群消费和广播消费:当使用集群消费模式时,消息队列RocketMQ版认为任意一条消息只需要被消费者集群内的任意一个消费者处理即可;当使用广播消费模式时,消息队列RocketMQ版会将每条消息推送给消费者集群内所有注册过的消费者,保证消息至少被每台机器消费一次。
- 重置消费位点:根据时间或位点重置消费进度,允许用户进行消息回溯或者丢弃堆积消息。
- 死信队列:将无法正常消费的消息储存到特殊的死信队列供后续处理。
- 全球消息路由:用于全球不同地域之间的消息同步,保证地域之间的数据一致性。
- 资源报表:消息生产和消费数据的统计功能。通过该功能,您可查询在一段时间范围内发送至某Topic的消息总量或者TPS(消息生产数据),也可查询在一个时间段内某Topic投递给某Group ID的消息总量或TPS(消息消费数据)。
- 监控报警:您可使用消息队列RocketMQ版提供的监控报警功能,监控某Group ID订阅的某Topic的消息消费状态并接收报警短信,帮助您实时掌握消息消费状态,以便及时处理消费异常。
消息类型
注意事项,您在调用sdk手法消息时需注意,消息队列rocketmq版提供的四种消息类型所对应的Topic不能混用,列入,您创建的普通消息的Topic只能用于收发普通消息,不能用于收发其他类型消息;同理,事务消息的Topic也只能收发事务消息,不能用于收发其他类型的消息,以此类推.
普通消息
HTTP SDK收发普通消息的示例代码
消息队列RocketMQ版支持RESTful风格的HTTP协议通信,并提供了以下7种语言的SDK:
定时消息
概念介绍
- 定时消息:Productr将消息发送到消息队列RocketMQ版服务器端,但并不期望立马投递这条消息,而是推迟到当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消费
- 延时消息:Producer将消息发送到消息队列RocketMQ服务器端,但并不期望立马投递这条消息,而是因吃一段时间后才投递到Consumer进行消费,该消息即延时消息.
定时消息与延时消息在代码配置上存在一些差异,但是最终达到的效果相同:消息在发送到消息队列RocketMQ版服务端并不会立马投递,而是根据消息中的属性延迟固定时间后才投递给消费者
适用场景
定时消息和延时消息 适用于以下一些场景:
- 消息生产和消费有时间窗口需求,列入在电商交易中超市未支付关闭订单的场景,在订单创建时会发送一条延时消息,这条消息会在三十分之以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付.如支付未完成关闭订单如已完成支付则忽略.
- 通过消息触发一些定时任务,列入莫以固定时间点向用户发送提醒消息
使用方式
定时消息和延时消息在代码编写上存在略微区别:
- 发送定时消息需要明确指定消息发送时间点之后的某一时间作为消息投递的时间点
- 发送延时消息需要设定一个预言师时间长度,消息讲从当前发送时间点开始延迟固定时间后才开始投递
注意事项
- 定时消息的精度1s-2s的延迟误差
- 定时和延时消息的
msg.setStartDeliverTime
参数需要设置成当前时间戳之后的某个时刻(单位毫秒).如果被设置当前时间戳之前的某个时刻,消息将立即投送给消费者 - 定时和延时消息的
msg.setStartDeliverTime
参数可设置40天内的任何时刻(单位毫秒),超过40天消息将发送失败 - StartDeliverTime是服务端开始向消费端投递的时间.如果消费者当前有消息堆积,那么定时和延时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递
由于客户端和服务端可能存在时间差,消息的投递时间与客户端的投递时间可能存在偏差
- 设置定时和延时消息的投递时间后,依然受三天的消息保存时长限制.
列如,设置定时消息五天后才能被消费,如果五天后一直没被消费,那么这条消息将在第八天被删除.
- 设置定时和延时消息的投递时间后,依然受三天的消息保存时长限制.
顺序消息
介绍
顺序消息(FIFO消息)是消息队列RocketMQ版提供的一种严格按照顺序来发布和执行的消息.顺序发布和顺序消费是对于一个指定的Topic,生产者按照一定的先后顺序发布消息;消费者按照既定的先后顺序订阅消息,即县发布的消息一定会被客户端接收到.
顺序消息分为全局顺序消息和分区顺序消息
顺序消息2.0简介
消息队列RocketMQ版全新推出顺序消息2.0特性,改特性具有以下特点:
- 高并发:顺序消息的2.0的无主一致架构极大提高了顺序消息的吞吐量
- 高可用:顺序消息2.0无主一致架构使其拥有更完善的均衡负载和故障节点自适应恢复能力.
- 无热点:顺序消息2.0拥有完善的集群负载均衡策略,解决了顺序消息的热点问题
- 如需使用顺序消息2.0特性,请确保您的消息队列RocketMQ版实例是企业铂金版。
- 仅TCP Java ons-client v1.8.7.1.Final及以上版本的SDK支持顺序消息2.0。
全局顺序消息
对于一个指定的topic,所有消息按照严格的先进先出(FIFO)的熟悉怒来发布和消费
适用场景:性能要求不高,所有的消息严格按照FIFO原则来发布和消费的场景
实例
在证券交易处理中,以人民币越换美元为Topic,在价格相同的情况下,先出价者优先处理,则可以按照FIFO的方式发布和消费全局顺序消息
分区顺序消息
对于一个指定的Topic,所有的消息根据Sharding Key进行区块分区.同一个分区内的消息严格按照FIFO顺序进行发布和消费.Sharding Key是顺序消息中用来区分不同分区的字段,和普通顺序消息的Key是完全不同的概念
适用场景
适用性能要求高,一Sharding Key作为分区字段,在同一个区块中严格按照FIFO原则进行消息发布和消费的场景
实例
- 用户注册需要发送验证码,一用户ID作为Sharding Key,那么同一个用户发送的消息都会按照发布的先后顺序来消费
- 电商的订单创建,以订单ID作为Sharding Key,那么同一个订单的创建订单消息,订单支付消息,订单物流消息,都会按照发布的先后顺序来消费.
阿里巴巴集团内部电商系统均使用分区顺序消息,既保证业务的顺序,同时又能保证业务的高性能。
对比
在控制台创建顺序消息的不同类型Topic对比如下
| Topic的消息类型 | 是否支持事务消息 | 是否支持定时和延时消息 | 性能 |
| :- | :- | :- | :- |
| 无序消息(普通、事务、定时和延时消息) | 是 | 是 | 最高 |
| 分区顺序消息 | 否 | 否 | 高 |
| 全局顺序消息 | 否 | 否 | 一般 |
发送方式对比
| 消息类型 | 是否支持可靠同步发送 | 是否支持可靠异步发送 | 是否支持Oneway发送 |
| :- | :- | :- | :- |
| 无序消息(普通、事务、定时和延时消息) | 是 | 是 | 是 |
| 分区顺序消息 | 是 | 否 | 否 |
| 全局顺序消息 | 是 | 否 | 否 |
注意事项
使用顺序消息时,请注意以下几点
- 建议同一个Group ID只对应一种类型的Topic,即不同时用于顺序消息和无序消息的收发
- 对于全局顺序消息,建议消息不要有阻塞.同时运行多个实例,是为了防止工作实例意外退出而导致业务中断.当工作实例退出时,其他实例也可以立即工作,不会导致业务中断,实际工作只会有一个实例.
事务消息
消息队列RocketMQ版提供的分布式事务消息适用于所有对数据最终一致性有强需求的场景。本文介绍消息队列RocketMQ版事务消息的概念、优势、典型场景、交互流程以及使用过程中的注意事项。
概念介绍
- 事务消息:消息队列RocketMQ提供类似X或Open XA的分布式事务功能,通过消息队列RocketMQ版事务消息能达到分布式事务的最终一致.
- 半事务消息:暂时不能投递的消息,发送方已经成功地将消息发送到了消息队列RocketMQ版服务器端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成"暂时不能投递状态",处于这种状态下的消息即半事务消息.
- 消息回查:由于网络闪断,生产者应用重启等原因,导致磨条事务消息的二次确认丢失,消息队列rocketMQ服务端通过扫描发现某条消息时,需要主动像小溪生产者询问该条消息的最终状态(Commit或者是Rollback),该询问过程即是消息回查.
分布式消息的优势
消息列RocketMQ版饭不是事务消息不仅可以实现应用之间的解耦,又能保证数据的最终一致性.同时,传统的大事务可以被拆分为小事务,不仅能提升效率,还不会因为某一个关联应用的不可用导致整体回滚 ,从而最大限度保证核心系统的可用性.在极端情况下,如果关联的某一个关联应用的不可导致整体回滚,从而最大限度保证核心系统的可用性.在极端情况下,如果关联的某一个应用始终无法成功处理,也只需对当前应用进行不常或数据订正处理,而无需对整体业务进行回滚
典型场景
在淘宝购物车下单时,涉及到购物车系统和交易系统,这俩个系统之间的数据最终一致性可以通过分布式事务消息的异步处理实现.在这种场景下,交易系统是最为核心的系统,乣最大限度地保证下单成功.而购物车系统只需要订阅消息队列RocketMQ版的交易订单系统 ,做响应的业务处理,即可保证最终的数据一致性
交互流程
事务消息交互流程如下图所示
事务消息发送步骤如下
- 发送方讲半事务消息发送至消息队列RocketMQ服务端
- 消息队列RockeMQ服务端讲消息持久化之后,想发送方返回Ack确认消息已经发送成功,此时消息为半事务消息
- 发送方开始执行本地事务逻辑
- 发送方根据本地事务执行结果向服务端提交二次确认(Commit或是Rollback),服务端收到Commit状态则将半事务消息标记为可投递,订阅放将最终收到该消息;服务端收到Rollback状态则删除半事务消息,订阅方将不会接受该消息
事务消息回查的步骤如下:
- 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认为到达服务端,经过固定时间后服务端讲对该消息发起回查
- 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果
- 发送方根据检查得到的本地事务的最终状态再次二次确认,服务端仍按照步骤4对半事务消息进行操作
注意事项
- 事务消息的Group ID不能与其他类型消息的Group ID共用。与其他类型的消息不同,事务消息有回查机制,回查时消息队列RocketMQ版服务端会根据Group ID去查询客户端。
- 通过
ONSFactory.createTransactionProducer
创建事务消息的Producer时必须指定LocalTransactionChecker
的实现类,处理异常情况下事务消息的回查。 事务消息发送完成本地事务后,可在
execute
方法中返回以下三种状态:TransactionStatus.CommitTransaction
:提交事务,允许订阅方消费该消息。TransactionStatus.RollbackTransaction
:回滚事务,消息将被丢弃不允许消费。TransactionStatus.Unknow
:暂时无法判断状态,等待固定时间以后消息队列RocketMQ版服务端
- 可通过以下方式给每条消息设定第一次消息回查的最快时间
Message message = new Message();
// 在消息属性中添加第一次消息回查的最快时间,单位秒。例如,以下设置实际第一次回查时间为120秒~125秒之间message.putUserProperties(PropertyKeyConst.CheckImmunityTimeInSeconds,"120");
// 以上方式只确定事务消息的第一次回查的最快时间,实际回查时间向后浮动0秒~5秒;如第一次回查后事务仍未提交,后续每隔5秒回查一次
更多信息
收发事务消息的示例代码如下:
TCP协议
HTTP协议(多语言)