RocketMQ

一、RockeMQ是什么

img

  • 是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点
  • Producer、Consumer、队列都可以分布式;
  • Producer 向一些队列轮流发送消息,队列集合称为 Topic,Consumer 如果做广播消费,则一个 consumer 实例消费这个 Topic 对应的所有队列,如果做集群消费,则多个 Consumer 实例平均消费这个 topic 对应的队列集合;
  • 能够保证严格的消息顺序;
  • 提供丰富的消息拉取模式;
  • 高效的订阅者水平扩展能力;
  • 实时的消息订阅机制;
  • 亿级消息堆积能力;
  • 较少的依赖。

二、RocketMQ基本概念

2.1 消息模型(Message Model)

RocketMQ 主要由 Producer、Broker、Consumer 三部分组成,其中 Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个 Topic 的消息,每个 Topic 的消息也可以分片存储于不同的 Broker。MessageQueue 用于存储消息的物理地址,每个 Topic 中的消息地址存储于多个 MessageQueue 中。ConsumerGroup 由多个 Consumer 实例构成。

2.2 消息生产者(Producer)

负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到 broker 服务器。RocketMQ 提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要 Broker 返回确认信息,单向发送不需要。

2.3 消息消费者(Consumer)

负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从 Broker 服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

2.4 主题(Topic)

表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是 RocketMQ 进行消息订阅的基本单位。

2.5 代理服务器(Broker Server)

消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

2.6 名字服务(Name Server)

名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的 BrokerIP 列表。多个 Namesrv 实例组成集群,但相互独立,没有信息交换。

2.7 拉取式消费(Pull Consumer)

Consumer 消费的一种类型,应用通常主动调用 Consumer 的拉消息方法从 Broker 服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。

2.8 推动式消费(Push Consumer)

Consumer 消费的一种类型,该模式下 Broker 收到数据后会主动推送给消费端,该消费模式一般实时性较高。

2.9 生产者组(Producer Group)

同一类 Producer 的集合,这类 Producer 发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则 Broker 服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。

2.10 消费者组(Consumer Group)

同一类 Consumer 的集合,这类 Consumer 通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的 Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。

2.11 集群消费(Clustering)

集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。

2.12 广播消费(Broadcasting)

广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。

2.13 普通顺序消息(Normal Ordered Message)

普通顺序消费模式下,消费者通过同一个消息队列(Topic 分区,称作 Message Queue)收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。

2.14 严格顺序消息(Strictly Ordered Message)

严格顺序消息模式下,消费者收到的所有消息均是有顺序的。

2.15 消息(Message)

消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ 中每个消息拥有唯一的 Message ID,且可以携带具有业务标识的 Key。系统提供了通过 Message ID 和 Key 查询消息的功能。

2.16 标签(Tag)

为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化 RocketMQ 提供的查询系统。消费者可以根据 Tag 实现对不同子主题的不同消费逻辑,实现更好的扩展性。

三、RocketMQ 特性(features)

3.1 订阅与发布

消息的发布是指某个生产者向某个 topic 发送消息;消息的订阅是指某个消费者关注了某个 topic 中带有某些 tag 的消息,进而从该 topic 消费数据。

3.2 消息顺序

消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ 可以严格的保证消息有序。顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个 Topic 下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。

  • 全局顺序
  • 对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景;
  • 分区顺序
  • 对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。
  • 适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。

3.3 消息过滤

RocketMQ 的消费者可以根据 Tag 进行消息过滤,也支持自定义属性过滤。消息过滤目前是在 Broker 端实现的,优点是减少了对于 Consumer 无用消息的网络传输,缺点是增加了 Broker 的负担、而且实现相对复杂。

3.4 消息可靠性

RocketMQ 支持消息的高可靠,影响消息可靠性的几种情况:

  1. Broker 非正常关闭;
  2. Broker 异常 Crash;
  3. OS Crash;
  4. 机器掉电,但是能立即恢复供电情况;
  5. 机器无法开机(可能是 cpu、主板、内存等关键设备损坏);
  6. 磁盘设备损坏;

1、2、3、4 四种情况都属于硬件资源可立即恢复情况,RocketMQ 在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。5、6 属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ 在这两种情况下,通过异步复制,可保证 99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与 Money 相关的应用。注:RocketMQ 从 3.0 版本开始支持同步双写。

3.5 至少一次

至少一次(At least Once)指每个消息必须投递一次。Consumer 先 Pull 消息到本地,消费完成后,才向服务器返回 ack,如果没有消费一定不会 ack 消息,所以 RocketMQ 可以很好的支持此特性。

3.6 回溯消费

回溯消费是指 Consumer 已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker 在向 Consumer 投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于 Consumer 系统故障,恢复后需要重新消费 1 小时前的数据,那么 Broker 要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ 支持按照时间回溯消费,时间维度精确到毫秒。

3.7 事务消息

RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。

3.8 定时消息

定时消息(延迟队列)是指消息发送到 broker 后,不会立即被消费,等待特定时间投递给真正的 topic。broker 有配置项 messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18 个 level。可以配置自定义 messageDelayLevel。注意,messageDelayLevel 是 broker 的属性,不属于某个 topic。发消息时,设置 delayLevel 等级即可:msg.setDelayLevel(level)。level 有以下三种情况:

  • level == 0,消息为非延迟消息;
  • 1 <= level <= maxLevel,消息延迟特定时间,例如 level == 1,延迟 1s;
  • level > maxLevel,则 leve l== maxLevel,例如 level == 20,延迟 2h。

定时消息会暂存在名为 SCHEDULE_TOPIC_XXXX 的 topic 中,并根据 delayTimeLevel 存入特定的 queue,queueId = delayTimeLevel – 1,即一个 queue 只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker 会调度地消费 SCHEDULE_TOPIC_XXXX,将消息写入真实的 topic。需要注意的是,定时消息会在第一次写入和调度写入真实 topic 时都会计数,因此发送数量、tps 都会变高。

3.9 消息重试

Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer 消费消息失败通常可以认为有以下几种情况:

  • 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过 10 秒后再重试。
  • 由于依赖的下游应用服务不可用,例如 db 连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用 sleep 30s,再消费下一条消息,这样可以减轻 Broker 重试消息的压力。

RocketMQ 会为每个消费组都设置一个 Topic 名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个 Topic 的重试队列是针对消费组,而不是针对每个 Topic 设置的),用于暂时保存因为各种异常而导致 Consumer 端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ 对于重试消息的处理是先保存至 Topic 名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行 Delay 后重新保存至“%RETRY%+consumerGroup”的重试队列中。

3.10 消息重投

生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway 没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在 RocketMQ 中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer 负载变化也会导致重复消息。如下方法可以设置消息重试策略:

  • retryTimesWhenSendFailed:同步发送失败重投次数,默认为 2,因此生产者会最多尝试发送 retryTimesWhenSendFailed + 1 次。不会选择上次失败的 broker,尝试向其他 broker 发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现 RemotingException、MQClientException 和部分 MQBrokerException 时会重投。
  • retryTimesWhenSendAsyncFailed:异步发送失败重试次数,异步重试不会选择其他 broker,仅在同一个 broker 上做重试,不保证消息不丢。
  • retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或 slave 不可用(返回状态非 SEND_OK),是否尝试发送到其他 broker,默认 false。十分重要消息可以开启。

3.11 流量控制

生产者流控,因为 broker 处理能力达到瓶颈;消费者流控,因为消费能力达到瓶颈。

生产者流控:

  • commitLog 文件被锁时间超过 osPageCacheBusyTimeOutMills 时,参数默认为 1000ms,返回流控。
  • 如果开启:
  • transientStorePoolEnable == true
  • 并且 broker 为异步刷盘的主机,且 transientStorePool 中资源不足,拒绝当前 send 请求,返回流控。
  • broker 每隔 10ms 检查 send 请求队列头部请求的等待时间,如果超过 waitTimeMillsInSendQueue,默认 200ms,拒绝当前 send 请求,返回流控。
  • broker 通过拒绝 send 请求方式实现流量控制。

注意,生产者流控,不会尝试消息重投。

消费者流控:

  • 消费者本地缓存消息数超过 pullThresholdForQueue 时,默认 1000。
  • 消费者本地缓存消息大小超过 pullThresholdSizeForQueue 时,默认 100MB。
  • 消费者本地缓存消息跨度超过 consumeConcurrentlyMaxSpan 时,默认 2000。
  • 消费者流控的结果是降低拉取频率。

3.12 死信队列

死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。RocketMQ 将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在 RocketMQ 中,可以通过使用 console 控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。

四、RocketMQ 设计

消息存储

img

消息存储是 RocketMQ 中最为复杂和最为重要的一部分,接下来分别从 RocketMQ 的消息存储整体架构、RocketMQ 中两种不同的刷盘方式来展开叙述。

消息存储整体架构

消息存储架构图中最重要的三个跟消息存储相关的文件构成

  • CommitLog

    消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容,消息内容不是定长的。单个文件大小默认 1G, 文件名长度为 20 位,左边补零,剩余为起始偏移量,比如 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为 1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件

  • ConsumeQueue

    消息消费队列,引入的目的主要是提高消息消费的性能,由于 RocketMQ 是基于主题 topic 的订阅模式,消息消费是针对主题进行的,如果要遍历 commitlog 文件中根据 topic 检索消息是非常低效的。Consumer 即可根据 ConsumeQueue 来查找待消费的消息。

    其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定 Topic 下的队列消息在 CommitLog 中的起始物理偏移量 offset,消息大小 size 和消息 Tag 的 HashCode 值。consumequeue 文件可以看成是基于 topic 的 commitlog 索引文件,故 consumequeue 文件夹的组织方式如下:topic/queue/file 三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。

    同样 consumequeue 文件采取定长设计,每一个条目共 20 个字节,分别为 8 字节的 commitlog 物理偏移量、4 字节的消息长度、8 字节 tag hashcode,单个文件由 30W 个条目组成,可以像数组一样随机访问每一个条目,每个 ConsumeQueue 文件大小约 5.72M;

  • IndexFile

    IndexFile(索引文件)提供了一种可以通过 key 或时间区间来查询消息的方法。Index 文件的存储位置是:{fileName},文件名 fileName 是以创建时的时间戳命名的,固定的单个 IndexFile 文件大小约为 400M,一个 IndexFile 可以保存 2000W 个索引,IndexFile 的底层存储设计为在文件系统中实现 HashMap 结构,故 rocketmq 的索引文件其底层实现为 hash 索引。

在 RocketMQ 的消息存储整体架构图中可以看出,RocketMQ 采用的是混合型的存储结构,即为 Broker 单个实例下所有的队列共用一个日志数据文件(即为 CommitLog)来存储。RocketMQ 的混合型存储结构(多个 Topic 的消息实体内容都存储于一个 CommitLog 中)针对 Producer 和 Consumer 分别采用了数据和索引部分相分离的存储结构,Producer 发送消息至 Broker 端,然后 Broker 端使用同步或者异步的方式对消息刷盘持久化,保存至 CommitLog 中。只要消息被刷盘持久化至磁盘文件 CommitLog 中,那么 Producer 发送的消息就不会丢失。正因为如此,Consumer 也就肯定有机会去消费这条消息。当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker 允许等待 30s 的时间,只要这段时间内有新消息到达,将直接返回给消费端。这里,RocketMQ 的具体做法是,使用 Broker 端的后台服务线程—ReputMessageService 不停地分发请求并异步构建 ConsumeQueue(逻辑消费队列)和 IndexFile(索引文件)数据。

消息刷盘

img

  • 同步刷盘

    只有在消息真正持久化至磁盘后 RocketMQ 的 Broker 端才会真正返回给 Producer 端一个成功的 ACK 响应。同步刷盘对 MQ 消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。

  • 异步刷盘

    能够充分利用 OS 的 PageCache 的优势,只要消息写入 PageCache 即可将成功的 ACK 返回给 Producer 端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了 MQ 的性能和吞吐量。

五、RocketMQ 技术架构

img

RocketMQ 架构上主要分为四部分,如上图所示:

  • Producer:

    消息发布的角色,支持分布式集群方式部署。Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败并且低延迟。

  • NameServer:

    NameServer 是一个非常简单的 Topic 路由注册中心,其角色类似 Dubbo 中的 zookeeper,支持 Broker 的动态注册与发现。

    主要包括两个功能:Broker 管理,NameServer 接受 Broker 集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查 Broker 是否还存活;路由信息管理,每个 NameServer 将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。然后 Producer 和 Conumser 通过 NameServer 就可以知道整个 Broker 集群的路由信息,从而进行消息的投递和消费。NameServer 通常也是集群的方式部署,各实例间相互不进行信息通讯。

    Broker 是向每一台 NameServer 注册自己的路由信息,所以每一个 NameServer 实例上面都保存一份完整的路由信息。当某个 NameServer 因某种原因下线了,Broker 仍然可以向其它 NameServer 同步其路由信息,Producer,Consumer 仍然可以动态感知 Broker 的路由的信息。

  • BrokerServer:

    Broker 主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker 包含了以下几个重要子模块。

    • Remoting Module:

      整个 Broker 的实体,负责处理来自 clients 端的请求。

    • Client Manager:

      负责管理客户端(Producer/Consumer)和维护 Consumer 的 Topic 订阅信息。

    • Store Service:

      提供方便简单的 API 接口处理消息存储到物理硬盘和查询功能。

    • HA Service:

      高可用服务,提供 Master Broker 和 Slave Broker 之间的数据同步功能。

    • Index Service:

      根据特定的 Message key 对投递到 Broker 的消息进行索引服务,以提供消息的快速查询。

img

六、RocketMQ 部署架构

img

RocketMQ 网络部署特点

  • NameServer 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
  • Broker 部署相对复杂,Broker 分为 Master 与 Slave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master,Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId 为 0 表示 Master,非 0 表示 Slave。
  • Master 也可以部署多个。每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。注意:当前 RocketMQ 版本在部署架构上支持一 Master 多 Slave,但只有 BrokerId=1 的从服务器才会参与消息的读负载。
  • Producer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可集群部署。
  • Consumer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。
  • Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,消费者在向 Master 拉取消息时,Master 服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读 I/O),以及从服务器是否可读等因素建议下一次是从 Master 还是 Slave 拉取。

结合部署架构图,描述集群工作流程:

  • 启动 NameServer,NameServer 起来后监听端口,等待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心。
  • Broker 启动,跟所有的 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息(IP+端口等)以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。
  • 收发消息前,先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。
  • Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker 建立长连接从而向 Broker 发消息。
  • Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。

七、RocketMQ 集群部署

7.1 集群模式

单 Master 模式

这种方式风险较大,一旦 Broker 重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。

  • 优点:本地开发测试,配置简单,同步刷盘消息不会丢失。
  • 缺点:不可靠,如果宕机会导致服务不可用。

多 Master 模式

一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master,这种模式的优缺点如下:

  • 优点:配置简单,单个 Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由于 RAID10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
  • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。

多 Master 多 Slave 模式(异步)

每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:

  • 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时 Master 宕机后,消费者仍然可以从 Slave 消费,而且此过程对应用透明,不需要人工干预,性能同多 Master 模式几乎一样;
  • 缺点:Master 宕机,磁盘损坏情况下会丢失少量消息。

多 Master 多 Slave 模式(同步)

每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:

  • 优点:数据与服务都无单点故障,Master 宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
  • 缺点:性能比异步复制模式略低(大约低 10%左右),发送单个消息的 RT 会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

DLedger 集群模式

RocketMQ-on-DLedger Group 是指一组相同名称的 Broker,至少需要 3 个节点,通过 Raft 自动选举出一个 Leader,其余节点 作为 Follower,并在 Leader 和 Follower 之间复制数据以保证高可用。

RocketMQ-on-DLedger Group 能自动容灾切换,并保证数据一致。RocketMQ-on-DLedger Group 是可以水平扩展的,也即可以部署任意多个 RocketMQ-on-DLedger Group 同时对外提供服务。

  • 优点:多节点(至少三个)组成集群,其中一个为 Leader 节点,其余为 Follower 节点组成高可用,能够自动容灾切换。
  • 缺点:集群成本增加(同一个 group 最少新增一台机器)、RocketMQ4.5 及以后版本才支持。

7.2 本地测试环境快速搭建(单 Master 模式)

环境准备

  1. jdk1.8+
  2. rocketmq-all-4.5.0-bin-release

安装步骤

1、解压 rocketmq-all-4.5.1-bin-release.zip 到指定目录,如下:

img

benchmark:基础测试脚本目录 / lib:运行依赖包

bin:命令运维脚本目录 / conf:配置目录

2、进入 bin 目录下编辑 runserver.shrunborker.sh 两个文件,调整一下 namesrv 和 broker 的启动的 jvm 内存参数。具体参数大小根据系统环境配置情况而定:

1
2
3
4
5
6
vim runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

vim runroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"
ps:broker端本地环境不建议开启堆外内存

3、根据情况配置 broker 的一些参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
brokerClusterName = DefaultCluster
brokerName = broker-a
# 0表示master >0表示slave
brokerId = 0
# 几点开始删除文件
deleteWhen = 04
# 文件保留时间,默认48小时
fileReservedTime = 48
# Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole = ASYNC_MASTER
#刷盘方式
# ASYNC_FLUSH 异步刷盘
# SYNC_FLUSH 同步刷盘
flushDiskType = ASYNC_FLUSH

备注:
# 存储路径
storePathRootDir=xxx/store
# commitLog存储路径
storePathCommitLog=xxx/store/commitlog
# 消费队列存储路径
storePathConsumeQueue=xx/store/pathconsumequeue
# 消息索引存储路径
storePathIndex=xxx/store/pathindex
# checkpoint 文件存储
storeCheckpoint=xxx/store/checkpoint
# abort 文件存储
abortFile=xxx/store/abort

备注:

  • xxx 默认等于 System.getProperty(“user.home”) +File.separator+“store”
  • 更多其他配置:
  • MessageStoreConfig、BrokerConfig 源码类
  • 根据实际情况配置存储路径,storePathCommitLog 占用空间相对最大

4、启动 rocketmq, 先启动 namesrv 再启动 broker

1
2
3
4
5
6
7
8
--后台启动namesrv  
nohup sh mqnamesrv &
--后台启动broker
export NAMESRV_ADDR=localhost:9876
--如果你在brocker.conf文件中配置了namesrvAddr = localhost:9876就直接用下面的命令
nohup sh mqbroker -c /xxx/rocketmq-all-4.5.1-bin-release/conf/broker.conf &
--否则用这个命令
nohup sh mqbroker -n localhost:9876 -c /xxx/rocketmq-all-4.5.1-bin-release/conf/

5、查看进程和日志看是否启动成功

1
jps -l  or 查看 namesrv.log 和 broker.log 日志

八、本地源码快速搭建调试

源码获取

https://github.com/apache/rocketmq/releases

该网址可下载 source 包,也可以 fork 仓库。

启动 RocketMQ Namesrv

参考 NameServerInstanceTest 的 startup 方法,编写 main 方法,demo 如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14

public class NameServerInstanceTest {
.....
public static void main(String[] args) throws Exception {
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876); //设置端口
// 创建 NamesrvController,启动
NamesrvController namesrvController = new NamesrvController(namesrvConfig, nettyServerConfig);
namesrvController.initialize();
namesrvController.start();
Thread.sleep(Long.MAX_VALUE); //挂起
}
}

备注:

运行 main 方法, 显示以下关键字说明启动成功

  • NettyEventExecutor service started
  • FileWatchService service started

启动 RocketMQ Broker

参考 BrokerControllerTest 的 testBrokerRestart 方法,编写 main 方法,demo 如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class BrokerControllerTest {
.....
public static void main(String[] args) throws Exception {
NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(10911);
BrokerConfig brokerConfig = new BrokerConfig(); // BrokerConfig 配置
brokerConfig.setNamesrvAddr("127.0.0.1:9876"); // 配置nameServer地址
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
// 创建 BrokerController 对象,并启动
BrokerController brokerController = new BrokerController(//
brokerConfig,
nettyServerConfig,
new NettyClientConfig(),
messageStoreConfig);
brokerController.initialize();
brokerController.start();
Thread.sleep(Long.MAX_VALUE); //挂起
}
}

备注:

运行 main 方法,不报错即可,但是在:NameServerInstanceTest(nameServer)中,发现以下关键字日志,表明 broker 注册 nameServer 成功:new topic registered…等

启动 RocketMQ Producer

参考:org.apache.rocketmq.example.quickstart.Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Producer {

public static void main(String[] args) throws MQClientException, InterruptedException, IOException {

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
/*
* Launch the instance.
*/
producer.setNamesrvAddr("127.0.0.1:9876"); // 配置nameServer地址
producer.start();

for (int i = 0; i < 1; i++) { // i 可以随便配置
try {

/*
* Create a message instance, specifying topic, tag and message body.
*/
Message msg = new Message("abc" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// msg.setDelayTimeLevel(10);
/*
* Call send message to deliver message to one of brokers.
*/
SendResult sendResult = producer.send(msg);

System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
/*
* Shut down once the producer instance is not longer in use.
*/
producer.shutdown();
}
}

备注:

运行 main 方法, 发现日志中包含:

SendResult [sendStatus=SEND_OK…即发送成功

**启动 RocketMQ Consumer,**参考:

org.apache.rocketmq.example.quickstart.Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class Consumer {

public static void main(String[] args) throws InterruptedException, MQClientException {
/*
* Instantiate with specified consumer group name.
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");

/*
* Specify name server addresses.
* <p/>
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
* <pre>
* {@code
* consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
* </pre>
*/

/*
* Specify where to start in case the specified consumer group is a brand new one.
*/
consumer.setNamesrvAddr("127.0.0.1:9876"); // 配置nameServer地址
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

/*
* Subscribe one more more topics to consume.
*/
consumer.subscribe("abc", "*");

/*
* Register callback to execute on arrival of messages fetched from brokers.
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

/*
* Launch the consumer instance.
*/
consumer.start();

System.out.printf("Consumer Started.%n");
}
}

备注:

运行 main 方法, 发现日志中包含:Receive New Messages…即消费成功

九、总结

rocketMq 作为低延迟、高并发、高可用、高可靠的分布式消息中间件,其详细知识点非常多,如需深入,建议源码入坑。


RocketMQ
http://example.com/2024/12/08/RocketMQ/
作者
Mercury
发布于
2024年12月8日
许可协议