1. 消息发送

生产者向消息队列里写入消息,不同的业务场景需要生产者采用不同的写入策略。比如同步发送、 异步发送、Oneway发送、延迟发送、发送事务消息等。 默认使用的是DefaultMQProducer类,发送消息要经过五个步骤:
1)设置Producer的GroupName。
2)设置InstanceName,当一个Jvm需要启动多个Producer的时候,通过设置不同的 InstanceName来区分,不设置的话系统使用默认名称“DEFAULT”。
3)设置发送失败重试次数,当网络出现异常的时候,这个次数影响消息的重复投递次数。想保证不丢消息,可以设置多重试几次。
4)设置NameServer地址
5)组装消息并发送。

package org.example;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;

/**
 * @author :xjlonly
 * @date :Created in 2021/6/11 15:25
 * @description:
 * @modified By:
 * @version: $
 */
public class MyProducer {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
       	//该producter是线程安全的 可以多线程使用,一般建议使用多个生产者
        //实例化生产者的同时设置生产组名称
        DefaultMQProducer producer = new DefaultMQProducer("myproducer_grp_04");
        //设置实例名称, 一个JVM中如果有多个生产者,可以通过实例名称区分
        //默认DEFAULT
        producer.setInstanceName("producer_grp_04_01");
        //设置同步发送重试次数
        producer.setRetryTimesWhenSendFailed(2);
        //指定nameserver
        producer.setNamesrvAddr("106.75.226.220:9876");

        //对生产者进行初始化
        producer.start();

        Message message = new Message("tp_demo_04", "hello rocket".getBytes(RemotingHelper.DEFAULT_CHARSET));

        //同步发消息  内部有重试机制 按照setRetryTimesWhenSendFailed设置的次数进行重试
        //broker中可能会有重复的消息,由应用的开发者进行处理
        final SendResult result = producer.send(message);

        producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {

            }
            //重试次数使用完之后 还失败 则调用此方法
            @Override
            public void onException(Throwable throwable) {

            }
        });
        
        //将消息放到Socket缓冲区,就返回,没有返回值, 不等待broker响应
        //速度快 会对消息
        producer.sendOneway(message);

        final SendStatus sendStatus = result.getSendStatus();
        
        
    }
}

消息发生返回状态(SendResult#SendStatus)有如下四种:

  • FLUSH_DISK_TIMEOUT
  • FLUSH_SLAVE_TIMEOUT
  • SLAVE_NOT_AVAILABLE
  • SEND_OK

不同状态在不同的刷盘策略和同步策略的配置下含义是不同的:

  1. FLUSH_DISK_TIMEOUT:表示没有在规定时间内完成刷盘(需要Broker的刷盘策略被设置成SYNC_FLUSH才会报这个错误)。
  2. FLUSH_SLAVE_TIMEOUT:表示在主备方式下,并且Broker被设置成SYNC_MASTER方式,没有在设定时间内完成主从同步。
  3. SLAVE_NOT_AVAILABLE:这个状态产生的场景和FLUSH_SLAVE_TIMEOUT类似,表示在主备方式下,并且Broker被设置成SYNC_MASTER,但是没有找到被配置成Slave的Broker。
  4. SEND_OK:表示发送成功,发送成功的具体含义,比如消息是否已经被存储到磁盘?消息是否被同步到了Slave上?消息在Slave上是否被写入磁盘?需要结合所配置的刷盘策略、主从策略来定。这个状态还可以简单理解为,没有发生上面列出的三个问题状态就是SEND_OK。

写一个高质量的生产者程序,重点在于对发送结果的处理,要充分考虑各种异常,写清对应的处理逻辑。

提升写入的性能
发送一条消息出去要经过三步

  1. 客户端发送请求到服务器。
  2. 服务器处理该请求。
  3. 服务器向客户端返回应答
    一次消息的发送耗时是上述三个步骤的总和。

在一些对速度要求高,但是可靠性要求不高的场景下,比如日志收集类应用, 可以采用Oneway方式发送
Oneway方式只发送请求不等待应答,即将数据写入客户端的Socket缓冲区就返回,不等待对方返回结果。
用这种方式发送消息的耗时可以缩短到微秒级

另一种提高发送速度的方法是增加Producer的并发量,使用多个Producer同时发送

我们不用担心多Producer同时写会降低消息写磁盘的效率,RocketMQ引入了一个并发窗口,在窗 口内消息可以并发地写入DirectMem中,然后异步地将连续一段无空洞的数据刷入文件系统当中。

顺序写CommitLog可让RocketMQ无论在HDD还是SSD磁盘情况下都能保持较高的写入性能
目前在阿里内部经过调优的服务器上,写入性能达到90万+的TPS,我们可以参考这个数据进行系统优化。
在Linux操作系统层级进行调优,推荐使用EXT4文件系统,IO调度算法使用deadline算法。

2. 消息消费

简单总结消费的几个要点:

  1. 消息消费方式(Pull和Push)
  2. 消息消费的模式(广播模式和集群模式)
  3. 流量控制(可以结合sentinel来实现,后面单独讲)
  4. 并发线程数设置
  5. 消息的过滤(Tag、Key) TagA||TagB||TagC * null
package org.example;

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.List;
import java.util.Set;

/**
 * @author :xjlonly
 * @date :Created in 2021/6/11 15:42
 * @description:
 * @modified By:
 * @version: $
 */
public class MyConsumer05 {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        // 拉取消息的消费者实例化,同时指定消费组名称
        DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer();
        //广播消息
        pullConsumer.setMessageModel(MessageModel.BROADCASTING);
        //集群消费
        pullConsumer.setMessageModel(MessageModel.CLUSTERING);
        final Set<MessageQueue> messageQueues1 = pullConsumer.fetchSubscribeMessageQueues("tp_demo_05");
        for(MessageQueue messageQueue : messageQueues1){
            //指定消息队列 指定标签过滤的表达式,消息偏移量和单次最大拉取的消息个数
            pullConsumer.pull(messageQueue, "TagA||TagB", 0l, 100);
        }


        //消息推送
        DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer();
        //广播消息
        pushConsumer.setMessageModel(MessageModel.BROADCASTING);
        //集群消费
        pushConsumer.setMessageModel(MessageModel.CLUSTERING);

        pushConsumer.setConsumeThreadMax(10);
        pushConsumer.setConsumeThreadMin(1);

        // 设置nameserver的地址
        pushConsumer.setNamesrvAddr("106.75.226.220:9876");


        //subExpression 标识标签过滤
        //* 标识不过滤
        pushConsumer.subscribe("tp_demo_05", "*");
        
                
        //设置批次处理消息一次性消费多少条
        pushConsumer.setConsumeMessageBatchMaxSize(100);

        // 对消费者进行初始化,然后就可以使用了
        pushConsumer.start();

    }
}

当Consumer的处理速度跟不上消息的产生速度,会造成越来越多的消息积压,这个时候首先查看消费逻辑本身有没有优化空间,除此之外还有三种方法可以提高Consumer的处理能力。

  1. 提高消费并行度
    同一个ConsumerGroup下(Clustering方式),可以通过增加Consumer实例的数量来提高并行度。
    通过加机器,或者在已有机器中启动多个Consumer进程都可以增加Consumer实例数。
    注意:总的Consumer数量不要超过Topic下Read Queue数量,超过的Consumer实例接收不到消息。
    此外,通过提高单个Consumer实例中的并行处理的线程数,可以在同一个Consumer内增加并行度来提高吞吐量(设置方法是修改consumeThreadMin和consumeThreadMax)。

  2. 以批量方式进行消费
    某些业务场景下,多条消息同时处理的时间会大大小于逐个处理的时间总和,比如消费消息中涉及update某个数据库,一次update10条的时间会大大小于十次update1条数据的时间。
    可以通过批量方式消费来提高消费的吞吐量。实现方法是设置Consumer的 consumeMessageBatchMaxSize这个参数,默认是1,如果设置为N,在消息多的时候每次收到的是个长度为N的消息链表

  3. 检测延时情况,跳过非重要消息
    Consumer在消费的过程中,如果发现由于某种原因发生严重的消息堆积,短时间无法消除堆积,这个时候可以选择丢弃不重要的消息,使Consumer尽快追上Producer的进度。

3. 消息存储

3.1 存储介质

  • 关系型数据库DB

Apache下开源的另外一款MQ—ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式 来做消息持久化,通过简单的xml配置信息即可实现JDBC消息存储。由于,普通关系型数据库(如Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈。在可靠性方面,该种方案非常依赖DB,如果一旦DB出现故障,则MQ的消息就无法落盘存储会导致线上故障
image.png

  • 文件系统

目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题。

性能对比
文件系统>关系型数据库DB

3.2 消息的存储和发送

1) 消息存储
目前的高性能磁盘,顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度。
但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍!
因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。
RocketMQ的消息用顺序写,保证了消息存储的速度。
2) 存储结构
RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成 的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每 个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。
image.png
消息存储架构图中主要有下面三个跟消息存储相关的文件构成。
(1) CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;
image.png
(2) ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行
如果要遍历commitlog文件根据topic检索消息是非常低效。
Consumer即可根据ConsumeQueue来查找待消费的消息。
其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引:

  1. 保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset
  2. 消息大小size
  3. 消息Tag的HashCode值。

consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:
topic/queue/file三层组织结构
具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}
image.png
consumequeue文件采取定长设计,每个条目共20个字节,分别为:

  1. 8字节的commitlog物理偏移量
  2. 4字节的消息长度
  3. 8字节tag hashcode
    单个文件由30W个条目组成,可以像数组一样随机访问
    每一个条目每个ConsumeQueue文件大小约5.72M

(3) IndexFile:IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。

  1. Index文件的存储位置是: $HOME/store/index/${fileName}
  2. 文件名fileName是以创建时的时间戳命名的
  3. 固定的单个IndexFile文件大小约为400M
  4. 一个IndexFile可以保存 2000W个索引
  5. IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引

image.png

4. 过滤消息

RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时 再做消息过滤的。

RocketMQ这么做是在于其Producer端写入消息和Consumer端订阅消息采用分离存储的机制来实现的,Consumer端订阅消息是需要通过ConsumeQueue这个消息消费的逻辑队列拿到一个索引,然后再从CommitLog里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构。

其ConsumeQueue的存储结构如下,可以看到其中有8个字节存储的Message Tag的哈希值,基于Tag的消息过滤正式基于这个字段值的。
image.png

主要支持如下2种的过滤方式
(1) Tag过滤方式:Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用||分隔。

  1. Consumer端会将这个订阅请求构建成一个 SubscriptionData,发送一个Pull消息的请求给Broker端。
  2. Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传给Store。
  3. Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤。
  4. 在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消息消费。

(2) SQL92的过滤方式:
仅对push的消费者起作用。
Tag方式虽然效率高,但是支持的过滤逻辑比较简单。
SQL表达式可以更加灵活的支持复杂过滤逻辑,这种方式的大致做法和上面的Tag过滤方式一样,只是在Store层的具体过滤过程不太一样真正的 SQL expression 的构建和执行由rocketmq-filter模块负责的。
每次过滤都去执行SQL表达式会影响效率,所以RocketMQ使用了BloomFilter避免了每次都去执行。
SQL92的表达式上下文为消息的属性。

image.png
conf/broker.conf
image.png
image.png
首先需要开启支持SQL92的特性,然后重启broker:
mqbroker -n localhost:9876 -c /opt/rocket/conf/broker.conf

RocketMQ仅定义了几种基本的语法,用户可以扩展:

  1. 数字比较: >, >=, <, <=, BETWEEN, =
  2. 字符串比较: =, <>, IN; IS NULL或者IS NOT NULL;
  3. 逻辑比较: AND, OR, NOT;
  4. Constant types are: 数字如:123, 3.1415; 字符串如:’abc’,必须是单引号引起来 NULL,特殊常量 布尔型如:TRUE or FALSE;image.png
    image.png
    image.png
package org.example;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;

/**
 * @author :xjlonly
 * @date :Created in 2021/6/11 15:25
 * @description:
 * @modified By:
 * @version: $
 */
public class MyProducer06 {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
        //实例化生产者的同时设置生产组名称
        DefaultMQProducer producer = new DefaultMQProducer("producer_grp_06");
        //设置实例名称, 一个JVM中如果有多个生产者,可以通过实例名称区分
        //默认DEFAULT
        producer.setInstanceName("producer_grp_06");
        //设置同步发送重试次数
        producer.setRetryTimesWhenSendFailed(2);
        //指定nameserver
        producer.setNamesrvAddr("106.75.226.220:9876");

        //对生产者进行初始化
        producer.start();
        Message message = null;
        for(int i = 0; i < 100; i++){
            //增加用户自定义属性 支持sql92去查询
            message.putUserProperty("mykey", "v" + i % 3);
            message = new Message("tp_demo_06", "tag-" + (i%3), ("hello rocket" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

              producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult.getSendStatus());
            }
            //重试次数使用完之后 还失败 则调用此方法
            @Override
            public void onException(Throwable throwable) {
                System.out.println(throwable.getMessage().toString());
            }
        });
        }

        Thread.sleep(10000);

        producer.shutdown();
    }
}
package org.example;

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.awt.image.renderable.ContextualRenderedImageFactory;
import java.util.List;
import java.util.Set;

/**
 * @author :xjlonly
 * @date :Created in 2021/6/11 15:42
 * @description:
 * @modified By:
 * @version: $
 */
public class MyConsumer06 {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {

        //消息推送
        DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("consumer_grp_06_04");

        pushConsumer.setMessageModel(MessageModel.CLUSTERING);
        // 设置nameserver的地址
        pushConsumer.setNamesrvAddr("106.75.226.220:9876");
        //subExpression 标识标签过滤
        //* 标识不过滤
        //pushConsumer.subscribe("tp_demo_06", "*");
        //pushConsumer.subscribe("tp_demo_06", "tag-1||tag-0");
        pushConsumer.subscribe("tp_demo_06", MessageSelector.bySql("mykey in ('v0', 'v1')"));
        pushConsumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                final MessageQueue messageQueue = consumeConcurrentlyContext.getMessageQueue();
                final String brokerName = messageQueue.getBrokerName();
                final String topic = messageQueue.getTopic();
                final int queueId = messageQueue.getQueueId();

                System.out.println("brokerName:" + brokerName + "\t" + topic + "\t" + queueId);

                for(MessageExt msg: list){
                    System.out.println(msg);
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

            }
        });


        // 对消费者进行初始化,然后就可以使用了
        pushConsumer.start();

        Thread.sleep(100000);
        pushConsumer.shutdown();
    }
}

(3) Filter Server方式。这是一种比SQL表达式更灵活的过滤方式,允许用户自定义Java函数,根据Java函数的逻辑对消息进行过滤。
要使用Filter Server,首先要在启动Broker前在配置文件里加上 filterServer-Nums=3 这样的配置,Broker在启动的时候,就会在本机启动3个Filter Server进程。Filter Server类似一个RocketMQ的 Consumer进程,它从本机Broker获取消息,然后根据用户上传过来的Java函数进行过滤,过滤后的消息再传给远端的Consumer。
这种方式会占用很多Broker机器的CPU资源,要根据实际情况谨慎使用。上传的java代码也要经过检查,不能有申请大内存、创建线程等这样的操作,否则容易造成Broker服务器宕机。

5. 零拷贝原理

5.1 PageCache

  • 由内存中的物理page组成,其内容对应磁盘上的block。
  • page cache的大小是动态变化的。
  • backing store: cache缓存的存储设备
  • 一个page通常包含多个block, 而block不一定是连续的。

    5.1.1 读Cache

    当内核发起一个读请求时, 先会检查请求的数据是否缓存到了page cache中。 如果有,那么直接从内存中读取,不需要访问磁盘, 此即 cache hit(缓存命中) 如果没有, 就必须从磁盘中读取数据, 然后内核将读取的数据再缓存到cache中, 如此后续的读请求就可以命中缓存了。
    page可以只缓存一个文件的部分内容, 而不需要把整个文件都缓存进来。

5.1.2 写Cache

当内核发起一个写请求时, 也是直接往cache中写入, 后备存储中的内容不会直接更新。
内核会将被写入的page标记为dirty, 并将其加入到dirty list中。
内核会周期性地将dirty list中的page写回到磁盘上, 从而使磁盘上的数据和内存中缓存的数据一致。

5.1.3 cache回收

Page cache的另一个重要工作是释放page, 从而释放内存空间。
cache回收的任务是选择合适的page释放
如果page是dirty的, 需要将page写回到磁盘中再释放。

5.2 cache和buffer的区别

  1. Cache:缓存区,是高速缓存,是位于CPU和主内存之间的容量较小但速度很快的存储器,因为CPU的速度远远高于主内存的速度,CPU从内存中读取数据需等待很长的时间,而 Cache保存着CPU刚用过的数据或循环使用的部分数据,这时从Cache中读取数据会更快,减少了CPU等待的时间,提高了系统的性能。

Cache并不是缓存文件的,而是缓存块的(块是I/O读写最小的单元);Cache一般会用在I/O请求上, 如果多个进程要访问某个文件,可以把此文件读入Cache中,这样下一个进程获取CPU控制权并访问此 文件直接从Cache读取,提高系统性能。

  1. Buffer:缓冲区,用于存储速度不同步的设备或优先级不同的设备之间传输数据;通过buffer 可以减少进程间通信需要等待的时间,当存储速度快的设备与存储速度慢的设备进行通信时,存储慢的数据先把数据存放到buffer,达到一定程度存储快的设备再读取buffer的数据,在此期间存储快的设备CPU可以干其他的事情。
    Buffer:一般是用在写入磁盘的,例如:某个进程要求多个字段被读入,当所有要求的字段被读入之前已经读入的字段会先放到buffer中。

5.3 HeapByteBuffer和DirectByteBuffer

HeapByteBuffer,是在jvm堆上面一个buffer,底层的本质是一个数组,用类封装维护了很多的索引(limit/position/capacity等)。
DirectByteBuffer,底层的数据是维护在操作系统的内存中,而不是jvm里,DirectByteBuffer里维护了一个引用address指向数据,进而操作数据。
HeapByteBuffer优点:内容维护在jvm里,把内容写进buffer里速度快;更容易回收
DirectByteBuffer优点:跟外设(IO设备)打交道时会快很多,因为外设读取jvm堆里的数据时,不是直接读取的,而是把jvm里的数据读到一个内存块里,再在这个块里读取的,如果使用 DirectByteBuffer,则可以省去这一步,实现zero copy(零拷贝) 外设之所以要把jvm堆里的数据copy出来再操作,不是因为操作系统不能直接操作jvm内存,而是因为jvm在进行gc(垃圾回收)时,会对数据进行移动,一旦出现这种问题,外设就会出现数据错乱的情况。

image.png
所有的通过allocate方法创建的buffer都是HeapByteBuffer.
image.png
堆外内存实现零拷贝

  1. 前者分配在JVM堆上(ByteBuffer.allocate()),后者分配在操作系统物理内存上 (ByteBuffer.allocateDirect(),JVM使用C库中的malloc()方法分配堆外内存);

  2. DirectByteBuffer可以减少JVM GC压力,当然,堆中依然保存对象引用,fullgc发生时也会回收直接内存,也可以通过system.gc主动通知JVM回收,或者通过 cleaner.clean主动清理。 Cleaner.create()方法需要传入一个DirectByteBuffer对象和一个Deallocator(一个堆外内存回收线程)。GC发生时发现堆中的DirectByteBuffer对象没有强引用了,则调用Deallocator的run()方法回收直接内存,并释放堆中DirectByteBuffer的对象引用;

  3. 底层I/O操作需要连续的内存(JVM堆内存容易发生GC和对象移动),所以在执行write操作时需要将HeapByteBuffer数据拷贝到一个临时的(操作系统用户态)内存空间中,会多一次额外拷贝。而DirectByteBuffer则可以省去这个拷贝动作,这是Java层面的 “零拷贝” 技术,在 netty中广泛使用;

  4. MappedByteBuffer底层使用了操作系统的mmap机制,FileChannel#map()方法就会返回 MappedByteBuffer。DirectByteBuffer虽然实现了MappedByteBuffer,不过 DirectByteBuffer默认并没有直接使用mmap机制。

5.4 缓冲IO和直接IO

5.4.1 缓存IO

缓存I/O又被称作标准I/O,大多数文件系统的默认I/O操作都是缓存I/O。在Linux的缓存I/O机制中,数据先从磁盘复制到内核空间的缓冲区,然后从内核空间缓冲区复制到应用程序的地址空间。
读操作:操作系统检查内核的缓冲区有没有需要的数据,如果已经缓存了,那么就直接从缓存中返回;否则从磁盘中读取,然后缓存在操作系统的缓存中。
写操作:将数据从用户空间复制到内核空间的缓存中。这时对用户程序来说写操作就已经完成,至于什么时候再写到磁盘中由操作系统决定,除非显示地调用了sync同步命令。

缓存I/O的优点:

  1. 在一定程度上分离了内核空间和用户空间,保护系统本身的运行安全;
  2. 可以减少读盘的次数,从而提高性能。

缓存I/O的缺点:

  1. 在缓存 I/O 机制中,DMA 方式可以将数据直接从磁盘读到页缓存中,或者将数据从页缓存直接写回到磁盘上,而不能直接在应用程序地址空间和磁盘之间进行数据传输。数据在传输过程
    中就需要在应用程序地址空间(用户空间)和缓存(内核空间)之间进行多次数据拷贝操作,这些数据拷贝操作所带来的CPU以及内存开销是非常大的。

5.4.2 直接IO

直接IO就是应用程序直接访问磁盘数据,而不经过内核缓冲区,这样做的目的是减少一次从内核缓冲区到用户程序缓存的数据复制。比如说数据库管理系统这类应用,它们更倾向于选择它们自己的缓存
机制,因为数据库管理系统往往比操作系统更了解数据库中存放的数据,数据库管理系统可以提供一种更加有效的缓存机制来提高数据库中数据的存取性能。

直接IO的缺点:如果访问的数据不在应用程序缓存中,那么每次数据都会直接从磁盘加载,这种直接加载会非常缓慢。通常直接IO与异步IO结合使用,会得到比较好的性能。

下图分析了写场景下的DirectIO和BufferIO:
image.png

5.5 内存映射文件(Mmap)

在LINUX中我们可以使用mmap用来在进程虚拟内存地址空间中分配地址空间,创建和物理内存的映射关系。
image.png
映射关系可以分为两种

  1. 文件映射 磁盘文件映射进程的虚拟地址空间,使用文件内容初始化物理内存。
  2. 匿名映射 初始化全为0的内存空间。
    而对于映射关系是否共享又分为
  3. 私有映射(MAP_PRIVATE) 多进程间数据共享,修改不反应到磁盘实际文件,是一个copy-on-write(写时复制)的映射方式。
  4. 共享映射(MAP_SHARED) 多进程间数据共享,修改反应到磁盘实际文件中。
    因此总结起来有4种组合
  5. 私有文件映射 多个进程使用同样的物理内存页进行初始化,但是各个进程对内存文件的修改不会共享,也不会反应到物理文件中
  6. 私有匿名映射 mmap会创建一个新的映射,各个进程不共享,这种使用主要用于分配内存 (malloc分配大内存会调用mmap)。 例如开辟新进程时,会为每个进程分配虚拟的地址空间, 这些虚拟地址映射的物理内存空间各个进程间读的时候共享,写的时候会copy-on-write。
  7. 共享文件映射 多个进程通过虚拟内存技术共享同样的物理内存空间,对内存文件 的修改会反应到实际物理文件中,他也是进程间通信(IPC)的一种机制。
  8. 共享匿名映射 这种机制在进行fork的时候不会采用写时复制,父子进程完全共享同样的物理内存页,这也就实现了父子进程通信(IPC).

mmap只是在虚拟内存分配了地址空间,只有在第一次访问虚拟内存的时候才分配物理内存。
在mmap之后,并没有在将文件内容加载到物理页上,只上在虚拟内存中分配了地址空间。当进程在访问这段地址时,通过查找页表,发现虚拟内存对应的页没有在物理内存中缓存,则产生”缺页”,由内核的缺页异常处理程序处理,将文件对应内容,以页为单位(4096)加载到物理内存,注意是只加载缺页,但也会受操作系统一些调度策略影响,加载的比所需的多。

5.6 直接内存读取并发送文件的过程

image.png

5.7 Mmap读取并发送文件的过程

image.png

5.8 Sendfile零拷贝读取并发送文件的过程

image.png
零拷贝(zero copy)小结

  1. 虽然叫零拷贝,实际上sendfile有2次数据拷贝的。第1次是从磁盘拷贝到内核缓冲区,第二次是从内核缓冲区拷贝到网卡(协议引擎)。如果网卡支持 SG-DMA(The Scatter-Gather Direct Memory Access)技术,就无需从PageCache拷贝至 Socket 缓冲区;
  2. 之所以叫零拷贝,是从内存角度来看的,数据在内存中没有发生过拷贝,只是在内存和I/O设备之间传输。很多时候我们认为sendfile才是零拷贝,mmap严格来说不算;
  3. Linux中的API为sendfile、mmap,Java中的API为FileChanel.transferTo()、 FileChannel.map()等;
  4. Netty、Kafka(sendfile)、Rocketmq(mmap)、Nginx等高性能中间件中,都有大量利用操作系统零拷贝特性。

6. 同步复制和异步复制

如果一个Broker组有Master和Slave,消息需要从Master复制到Slave 上,有同步和异步两种复制方式。
1)同步复制
同步复制方式是等Master和Slave均写 成功后才反馈给客户端写成功状态;
在同步复制方式下,如果Master出故障,Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统吞吐量。

2)异步复制
异步复制方式是只要Master写成功 即可反馈给客户端写成功状态。
在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写 入Slave,有可能会丢失;

3)配置
同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER、 SYNC_MASTER、SLAVE三个值中的一个。
/opt/rocket/conf/broker.conf 文件:Broker的配置文件
image.png
image.png
4)总结
image.png
实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式, 尤其是SYNC_FLUSH方式,由于频繁地触发磁盘写动作,会明显降低性能。通常情况下,应该把Master和Save配置成ASYNC_FLUSH的刷盘 方式,主从之间配置成SYNC_MASTER的复制方式,这样即使有一台机器出故障,仍然能保证数据不丢,是个不错的选择。

7. 高可用机制

RocketMQ分布式集群是通过Master和Slave的配合达到高可用性的。

Master和Slave的区别:

  1. 在Broker的配置文件中,参数brokerId的值为0表明这个Broker是Master, 主从的brokerName是一样的
  2. 大于0表明这个Broker是Slave,
  3. brokerRole参数也说明这个Broker是Master还是Slave。
    (SYNC_MASTER/ASYNC_MASTER/SALVE)

  4. Master角色的Broker支持读和写,Slave角色的Broker仅支持读。

  5. Consumer可以连接Master角色的Broker,也可以连接Slave角色的Broker来读取消息。
    image.png

7.1 消息消费高可用

 在Consumer的配置文件中,并不需要设置是从Master读还是从Slave 读,当Master不可用或者繁忙的时候,Consumer会被**自动切换**到从Slave 读。
 有了自动切换Consumer这种机制,当一个Master角色的机器出现故障后,Consumer仍然可以从Slave读取消息,不影响Consumer程序。
 这就达到了消费端的高可用性。

7.2 消息发送高可用

如何达到发送端的高可用性呢?
在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同brokerId的机器组成一个Broker组),这样既可以在性能方面具有扩展性,也可以降低主节点故障对整体上带来的影响,而且当一个Broker组的Master不可用后,其他组的Master仍然可用,Producer仍然可以发送消息的。

RocketMQ目前还不支持把Slave自动转成Master,如果机器资源不足,需要把Slave转成Master。

  1. 手动停止Slave角色的Broker。
  2. 更改配置文件。
  3. 用新的配置文件启动Broker。
    image.png
    这种早期方式在大多数场景下都可以很好的工作,但也面临一些问题。
    比如,在需要保证消息严格顺序的场景下,由于在主题层面无法保证严格顺序,所以必须指定队列来发送消息,对于任何一个队列,它一定是落在一组特定的主从节点上,如果这个主节点宕机,其他的主节点是无法替代这个主节点的,否则就无法保证严格顺序。

在这种复制模式下,严格顺序和高可用只能选择一个。

RocketMQ 在 2018 年底迎来了一次重大的更新,引入 Dledger,增加了一种全新的复制方式。
RocketMQ 引入 Dledger,使用新的复制方式,可以很好地解决这个问题。
Dledger 在写入消息的时候,要求至少消息复制到半数以上的节点之后,才给客户端返回写入成功,并且它是支持通过选举来动态切换主节点的。

举例:
假如有3个节点,当主节点宕机的时候,2 个从节点会通过投票选出一个新的主节点来继续提供服务,相比主从的复制模式,解决了可用性的问题。
由于消息要至少复制到 2 个节点上才会返回写入成功,即使主节点宕机了,也至少有一个节点上的消息是和主节点一样的。
Dledger在选举时,总会把数据和主节点一样的从节点选为新的主节点,这样就保证了数据的一致性,既不会丢消息,还可以保证严格顺序。

存在问题:
当然,Dledger的复制方式也不是完美的,依然存在一些不足:

  1. 比如,选举过程中不能提供服务。
  2. 最少需要 3 个节点才能保证数据一致性,3 节点时,只能保证 1 个节点宕机时可用,如果 2个节点同时宕机,即使还有 1 个节点存活也无法提供服务,资源的利用率比较低。
  3. 另外,由于至少要复制到半数以上的节点才返回写入成功,性能上也不如主从异步复制的方式快。

8. 刷盘机制

RocketMQ 的所有消息都是持久化的,先写入系统 PageCache,然后刷盘,可以保证内存与磁盘都有一份数据, 访问时,直接从内存读取。消息在通过Producer写入RocketMQ的时候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。

8.1 同步刷盘

image.png
同步刷盘与异步刷盘的唯一区别是异步刷盘写完 PageCache直接返回,而同步刷盘需要等待刷盘完成才返回, 同步刷盘流程如下:
(1). 写入 PageCache后,线程等待,通知刷盘线程刷盘。
(2). 刷盘线程刷盘后,唤醒前端等待线程,可能是一批线程。
(3). 前端等待线程向用户返回成功

8.2 异步刷盘

image.png
在有 RAID 卡,SAS 15000 转磁盘测试顺序写文件,速度可以达到 300M 每秒左右,而线上的网卡一般都为千兆 网卡,写磁盘速度明显快于数据网络入口速度,那么是否可以做到写完内存就向用户返回,由后台线程刷盘呢?

  1. 由于磁盘速度大于网卡速度,那么刷盘的进度肯定可以跟上消息的写入速度。
  2. 万一由于此时系统压力过大,可能堆积消息,除了写入 IO,还有读取 IO,万一出现磁盘读取落后情况, 会不会导致系统内存溢出,答案是否定的,原因如下:
    写入消息到 PageCache时,如果内存不足,则尝试丢弃干净的 PAGE,腾出内存供新消息使用,策略是LRU 方式。
    如果干净页不足,此时写入 PageCache会被阻塞,系统尝试刷盘部分数据,大约每次尝试 32个 PAGE , 来找出更多干净 PAGE。
    综上,内存溢出的情况不会出现。

9. 负载均衡

RocketMQ中的负载均衡都在Client端完成,具体来说的话,主要可以分为Producer端发送消息时候的负载均衡和Consumer端订阅消息的负载均衡。

9.1 Producer的负载均衡

image.png
如图所示,5 个队列可以部署在一台机器上,也可以分别部署在 5 台不同的机器上,发送消息通过轮询队列的方式 发送,每个队列接收平均的消息量。通过增加机器,可以水平扩展队列容量。 另外也可以自定义方式选择发往哪个队列。

# 创建主题 
[root@node1 ~]# mqadmin updateTopic -n localhost:9876 -t tp_demo_02 -w 6 -b localhost:10911
DefaultMQProducer producer = new DefaultMQProducer("producer_grp_02");

producer.setNamesrvAddr("node1:9876");
producer.start();
Message message = new Message(); 
message.setTopic("tp_demo_02");
message.setBody("hello lagou".getBytes()); 
// 指定MQ messageQueue 主题 broker名称  队列编号
SendResult result = producer.send(message, new MessageQueue("tp_demo_02", "node1", 5), 1_000 );
System.out.println(result.getSendStatus());
producer.shutdown();

9.2 Consumer的负载均衡

image.png
如图所示,如果有 5 个队列,2 个 consumer,那么第一个 Consumer 消费 3 个队列,第二consumer 消费 2 个队列。 这样即可达到平均消费的目的,可以水平扩展 Consumer 来提高消费能力。但是 Consumer 数量要小于等于队列数量,如果 Consumer 超过队列数量,那么多余的Consumer 将不能消费消息 。

在RocketMQ中,Consumer端的两种消费模式(Push/Pull)底层都是基于拉模式来获取消息的,而在Push模式只是对pull模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。如果未拉取到消息,则延迟一下又继续拉取。

在两种基于拉模式的消费方式(Push/Pull)中,均需要Consumer端在知道从Broker端的哪一个消息队列中去获取消息。
因此,有必要在Consumer端来做负载均衡,即Broker端中多个MessageQueue分配给同一个ConsumerGroup中的哪些Consumer消费。
要做负载均衡,必须知道一些全局信息,也就是一个ConsumerGroup里到底有多少个Consumer。

知道了全局信息,才可以根据某种算法来分配,比如简单地平均分到各个Consumer。
在RocketMQ中,负载均衡或者消息分配是在Consumer端代码中完成的,Consumer从Broker处获得全局信息,然后自己做负载均衡,只处理分给自己的那部分消息。

Pull Consumer可以看到所有的Message Queue,而且从哪个Message Queue读取消息,读消息时的Offset都由使用者控制,使用者可以实现任何特殊方式的负载均衡。

DefaultMQPullConsumer有两个辅助方法可以帮助实现负载均衡,一个是registerMessageQueueListener函数,一个是MQPullConsumerScheduleService(使用这个Class类似使用DefaultMQPushConsumer,但是它把Pull消息的主动性留给了使用者)

public class MyConsumer { 
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("consumer_pull_grp_01");
        consumer.setNamesrvAddr("node1:9876");
        consumer.start();
        Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues("tp_demo_01");
        for (MessageQueue messageQueue : messageQueues) {
            // 指定从哪个MQ拉取数据 
            PullResult result = consumer.pull(messageQueue, "*", 0L, 10);
            List<MessageExt> msgFoundList = result.getMsgFoundList();
            for (MessageExt messageExt : msgFoundList) {
                System.out.println(messageExt); 
            }
        }
        
        consumer.shutdown();
    }
}

DefaultMQPushConsumer的负载均衡过程不需要使用者操心,客户端程序会自动处理,每个DefaultMQPushConsumer启动后,会马上会触发一个doRebalance动作;而且在同一个ConsumerGroup里加入新的DefaultMQPush-Consumer时,各个Consumer都会被触发doRebalance动作。
负载均衡的分配粒度只到Message Queue,把Topic下的所有Message Queue分配到不同的Consumer中

   如下图所示,具体的负载均衡算法有几种,**默认用的是AllocateMessageQueueAveragely**。

image.png
我们可以设置负载均衡的算法:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_push_grp_01"); 

consumer.setNamesrvAddr("node1:9876");

// 设置负载均衡算法 
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
consumer.setMessageListener(new MessageListenerConcurrently() { 
    @Override 
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        // todo 处理接收到的消息
        return null; 
    }
});

consumer.start();

以AllocateMessageQueueAveragely策略为例,如果创建Topic的时候,把Message Queue数设为3,当Consumer数量为2的时候,有一个Consumer需要处理Topic三分之二的消息,另一个处理三分之一的消息;当Consumer数量为4的时候,有一个Consumer无法收到消息,其他3个Consumer各处理Topic三分之一的消息。

可见Message Queue数量设置过小不利于做负载均衡,通常情况下,应把一个Topic的Message Queue数设置为16。

1、Consumer端的心跳包发送
在Consumer启动后,它就会通过定时任务不断地向RocketMQ集群中的所有Broker实例发送心跳包(其中包含了消息消费分组名称、订阅关系集合、消息通信模式和客户端id的值等信息)。Broker端在收到Consumer的心跳消息后,会将它维护在ConsumerManager的本地缓存变量—consumerTable,同时并将封装后的客户端网络通道信息保存在本地缓存变量—channelInfoTable中,为之后做Consumer端的负载均衡提供可以依据的元数据信息。
2、Consumer端实现负载均衡的核心类—RebalanceImpl
在Consumer实例的启动流程中启动MQClientInstance实例的部分,会完成负载均衡服务线程—RebalanceService的启动(每隔20s执行一次)。

image.png
image.png
image.png
image.png
通过查看源码可以发现,RebalanceService线程的run()方法最终调用的是RebalanceImpl类的rebalanceByTopic()方法,该方法是实现Consumer端负载均衡的核心。
image.png
image.png
image.png
这里,rebalanceByTopic()方法根据消费者通信类型为“广播模式”还是“集群模式”做不同的逻辑处理。
对于集群模式:
image.png
image.png
image.png

默认的负载均衡算法:
image.png
image.png
image.png
image.png
image.png
AllocateMessageQueueAveragely是默认的MQ分配对象。
算法:
image.png
image.png
消息消费队列在同一消费组不同消费者之间的负载均衡,其核心设计理念是在一个消息消费队列在同一时间只允许被同一消费组内的一个消费者消费,一个消息消费者能同时消费多个消息队列。通过此限制防止重复消费。

10. 消息消费重试

10.1 顺序消息的重试

对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒, 前一条消息不成功后一条消息不会消费),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_04_01");
consumer.setNamesrvAddr("node1:9876");
//一次获取一条消息 即每次每个messagequeue取一条
consumer.setConsumeMessageBatchMaxSize(1);
consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1);

// 消息订阅 
consumer.subscribe("tp_demo_04", "*");
// 并发消费  不保证顺序
// consumer.setMessageListener(new MessageListenerConcurrently() { 
// @Override 
// public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { 
// return null; 
// } 
// }); 

// 顺序消费 
consumer.setMessageListener(new MessageListenerOrderly() { 
    @Override 
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { 
        for (MessageExt msg : msgs) { 
            System.out.println(msg.getMsgId() + "\t" + msg.getQueueId() + "\t" + new String(msg.getBody()));
        }
        return ConsumeOrderlyStatus.SUCCESS; //确认消息
        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//引发重试
        return null; //引发重试
    }
}); 

consumer.start();

10.2 无序消息的重试

对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。
无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

1)重试次数
消息队列 RocketMQ 默认允许每条消息最多重试 16 次,每次重试的间隔时间如下:
image.png
如果消息重试 16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。
注意: 一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。

2)配置方式
消费失败后,重试配置方式
集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):

  • 返回 ConsumeConcurrentlyStatus.RECONSUME_LATER; (推荐)
  • 返回 Null
  • 抛出异常

    public class MyConcurrentlyMessageListener implements MessageListenerConcurrently {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { 
            //处理消息
            doConsumeMessage(msgs); 
            
            //方式1:返回 ConsumeConcurrentlyStatus.RECONSUME_LATER,消息将重试 
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            
            //方式2:返回 null,消息将重试 
            return null; 
            
            //方式3:直接抛出异常, 消息将重试
            throw new RuntimeException("Consumer Message exceotion"); 
        }
    }

    消费失败后,不重试配置方式
    集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,此后这条消息将不会再重试。

    public class MyConcurrentlyMessageListener implements MessageListenerConcurrently {
        @Override 
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { 
            try {
                doConsumeMessage(msgs);
            } catch (Throwable e) { 
                //捕获消费逻辑中的所有异常,并返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            
            //消息处理正常,直接返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 
        }
    }

    自定义消息最大重试次数
    消息队列 RocketMQ 允许 Consumer 启动的时候设置最大重试次数,重试时间间隔将按照如下策略:

  • 最大重试次数小于等于 16 次,则重试时间间隔同上表描述。

  • 最大重试次数大于 16 次,超过 16 次的重试时间间隔均为每次 2 小时。
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_04_01");
    // 设置重新消费的次数 
    // 共16个级别,大于16的一律按照2小时重试 
    consumer.setMaxReconsumeTimes(20);
    注意:
    消息最大重试次数的设置对相同 Group ID 下的所有 Consumer 实例有效
    如果只对相同 Group ID 下两个 Consumer 实例中的其中一个设置了MaxReconsumeTimes,那么该配置对两个 Consumer 实例均生效。
    配置采用覆盖的方式生效,即最后启动的 Consumer 实例会覆盖之前的启动实例的配置

获取消息重试次数
消费者收到消息后,可按照如下方式获取消息的重试次数:

public class MyConcurrentlyMessageListener implements MessageListenerConcurrently { 
    @Override 
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { 
        for (MessageExt msg : msgs) { 
            System.out.println(msg.getReconsumeTimes());
        }
        
        doConsumeMessage(msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 
    }
}

11. 死信队列

RocketMQ中消息重试超过一定次数后(默认16次)就会被放到死信队列中,在消息队列RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。可以在控制台Topic列表中看到“DLQ”相关的Topic,默认命名是:

  • %RETRY%消费组名称(重试Topic)
  • %DLQ%消费组名称(死信Topic)
  • 死信队列也可以被订阅和消费,并且也会过期

可视化工具:rocketmq-console下载地址:
https://github.com/apache/rocketmq-externals/archive/rocketmq-console-1.0.0.zip

使用jdk8:

# 编译打包 
mvn clean package -DskipTests 

# 运行工具
java -jar target/rocketmq-console-ng-1.0.0.jar

页面设置NameSrv地址即可。如果不生效,就直接修改项目的application.properties中的namesrv地址选项的值。
image.png

11.1 死信特性

死信消息具有以下特性

  • 不会再被消费者正常消费。
  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的3天内及时处理。

死信队列具有以下特性:

  • 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
  • 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
  • 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。

11.2 查看死信信息

1.在控制台查询出现死信队列的主题信息
image.png
2.在消息界面根据主题查询死信消息
image.png
3.选择重新发送消息
一条消息进入死信队列,意味着某些因素导致消费者无法正常消费该消息,因此,通常需要您对其进行特殊处理。排查可疑因素并解决问题后,可以在消息队列 RocketMQ 控制台重新发送该消息,让消费者重新消费一次。