1. 延迟消息

定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。 broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。注意,messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:

  • level == 0,消息为非延迟消息
  • 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
  • level > maxLevel,则level== maxLevel,例如level==20,延迟2h

定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。
需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。

查看SCHEDULE_TOPIC_XXXX主题信息:
image.png
生产者:

public class MyProducer { 
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer producer = new DefaultMQProducer("producer_grp_06_01");
        producer.setNamesrvAddr("node1:9876");
        producer.start();
        Message message = null; 
        for (int i = 0; i < 20; i++) { 
            // 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
            message = new Message("tp_demo_06", ("hello lagou - " + i).getBytes());
            // 设置延迟级别,0表示不延迟,大于18的总是延迟2h
            message.setDelayTimeLevel(i); 
            producer.send(message);
        }
        
        producer.shutdown(); 
    }
}

消费者:

public class MyConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_06_01");
        consumer.setNamesrvAddr("node1:9876");
        consumer.subscribe("tp_demo_06", "*"); 
        consumer.setMessageListener(new MessageListenerConcurrently() { 
            @Override 
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println(System.currentTimeMillis() / 1000);
                for (MessageExt msg : msgs) { 
                    System.out.println( msg.getTopic() + "\t" + msg.getQueueId() + "\t" + msg.getMsgId() + "\t" + msg.getDelayTimeLevel() + "\t" + new String(msg.getBody()) ); 
                }
                
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        
        consumer.start(); 
    }
}

2. 顺序消息

顺序消息是指消息的消费顺序和产生顺序相同,在有些业务逻辑下,必须保证顺序。比如订单的生成、付款、发货,这3个消息必须按顺序处理才行。
顺序消息分为全局顺序消息和部分顺序消息:

  1. 全局顺序消息指某个Topic下的所有消息都要保证顺序;
  2. 部分顺序消息只要保证每一组消息被顺序消费即可,比如上面订单消息的例子,只要保证同一个订单ID的三个消息能按顺序消费即可。
    在多数的业务场景中实际上只需要局部有序就可以了。

RocketMQ在默认情况下不保证顺序,比如创建一个Topic,默认八个写队列,八个读队列。这时候一条消息可能被写入任意一个队列里;在数据的读取过程中,可能有多个Consumer,每个Consumer也可能启动多个线程并行处理,所以消息被哪个Consumer消费,被消费的顺序和写入的顺序是否一致是不确定的。

要保证全局顺序消息,需要先把Topic的读写队列数设置为一,然后Producer和Consumer的并发设置也要是一。简单来说,为了保证整个Topic的全局消息有序,只能消除所有的并发处理,各部分都设置成单线程处理。
image.png
原理如上图所示:
要保证部分消息有序,需要发送端和消费端配合处理。在发送端,要做到把同一业务ID的消息发送到同一个Message Queue;在消费过程中,要做到从同一个Message Queue读取的消息不被并发处理,这样才能达到部分有序。消费端通过使用MessageListenerOrderly类来解决单Message Queue的消息被并发处理的问题。
Consumer使用MessageListenerOrderly的时候,下面四个Consumer的设置依旧可以使用:

  1. setConsumeThreadMin
  2. setConsumeThreadMax
  3. setPullBatchSize
  4. setConsumeMessageBatchMaxSize。

    前两个参数设置Consumer的线程数;
    PullBatchSize指的是一次从Broker的一个Message Queue获取消息的最大数量,默认值是32;
    ConsumeMessageBatchMaxSize指的是这个Consumer的Executor(也就是调用MessageListener处理的地方)一次传入的消息数(Listmsgs这个链表的最大长度),默认值是1。

上述四个参数可以使用,说明MessageListenerOrderly并不是简单地禁止并发处理。在MessageListenerOrderly的实现中,为每个Consumer Queue加个锁,消费每个消息前,需要先获得这个消息对应的Consumer Queue所对应的锁,这样保证了同一时间,同一个Consumer Queue的消息不被并发消费,但不同Consumer Queue的消息可以并发处理。

部分有序:
顺序消息的生产和消费:

# 创建主题,88[root@node1 ~]# mqadmin updateTopic -b node1:10911 -n localhost:9876 -r 8 -t tp_demo_07 -w 8

# 删除主题的操作:
[root@node1 ~]# mqadmin deleteTopic -c DefaultCluster deleteTopic -n localhost:9876 -t tp_demo_07 

# 主题描述 
[root@node1 ~]# mqadmin topicStatus -n localhost:9876 -t tp_demo_07

OrderProducer.java
package com.lagou.rocket.demo.producer;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer; 
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.List; 

public class OrderProducer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer producer = new DefaultMQProducer("producer_grp_07_01");
        producer.setNamesrvAddr("node1:9876"); 
        producer.start();
        Message message = null;
        List<MessageQueue> queues = producer.fetchPublishMessageQueues("tp_demo_07"); 
        System.err.println(queues.size()); 
        MessageQueue queue = null; 
        for (int i = 0; i < 100; i++) { 
            queue = queues.get(i % 8);
            message = new Message("tp_demo_07", ("hello lagou - order create" + i).getBytes()); 
            producer.send(message, queue);
            
            message = new Message("tp_demo_07", ("hello lagou - order payed" + i).getBytes()); 
            producer.send(message, queue); 
            
            message = new Message("tp_demo_07", ("hello lagou - order ship" + i).getBytes()); 
            producer.send(message, queue); 
        }
        
        producer.shutdown(); 
    } 
}

OrderConsumer.java:
package com.lagou.rocket.demo.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQBrokerException; 
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt; 
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException; 
import java.util.List; 
import java.util.Set; 
public class OrderConsumer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { 
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("consumer_grp_07_01"); 
        consumer.setNamesrvAddr("node1:9876"); 
        consumer.start();
        Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues("tp_demo_07");
        System.err.println(messageQueues.size()); 
        for (MessageQueue messageQueue : messageQueues) {
            long nextBeginOffset = 0; 
            System.out.println("==============================="); 
            do {
                PullResult pullResult = consumer.pull(messageQueue, "*", nextBeginOffset, 1);
                if (pullResult == null || pullResult.getMsgFoundList() == null) 
                    break; 
                
                nextBeginOffset = pullResult.getNextBeginOffset();
                List<MessageExt> msgFoundList = pullResult.getMsgFoundList(); 
                System.out.println(messageQueue.getQueueId() + "\t" + msgFoundList.size());
                for (MessageExt messageExt : msgFoundList) { 
                    System.out.println( messageExt.getTopic() + "\t" + messageExt.getQueueId() + "\t" + messageExt.getMsgId() + "\t" + new String(messageExt.getBody()) ); 
                }
            }
            
            while (true); 
        }
        
        consumer.shutdown();
    } 
}

全局有序:
顺序消息的生产和消费:
# 创建主题,88[root@node1 ~]# mqadmin updateTopic -b node1:10911 -n localhost:9876 -r 1 -t tp_demo_07_01 -w 1 

# 删除主题的操作: 
[root@node1 ~]# mqadmin deleteTopic -c DefaultCluster deleteTopic -n localhost:9876 -t tp_demo_07_01 

# 主题描述 
[root@node1 ~]# mqadmin topicStatus -n localhost:9876 -t tp_demo_07_01

GlobalOrderProduer.java
import org.apache.rocketmq.common.message.Message; 
import org.apache.rocketmq.remoting.exception.RemotingException;
public class GlobalOrderProducer { 
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer producer = new DefaultMQProducer("producer_grp_07_02"); 
        producer.setNamesrvAddr("node1:9876");
        producer.start(); 
        Message message = null;
        for (int i = 0; i < 100; i++) {
            message = new Message("tp_demo_07_01", ("hello lagou" + i).getBytes());
            producer.send(message); 
        }
        
        producer.shutdown(); 
    }

}

GlobalOrderConsumer.java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; 
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException; 
import org.apache.rocketmq.common.message.MessageExt; 
import java.util.List;


public class GlobalOrderConsumer { 
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_07_03"); 
        consumer.setNamesrvAddr("node1:9876"); 
        consumer.subscribe("tp_demo_07_01", "*");
        consumer.setConsumeThreadMin(1); 
        consumer.setConsumeThreadMax(1);
        consumer.setPullBatchSize(1); 
        consumer.setConsumeMessageBatchMaxSize(1); 
        consumer.setMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { 
                for (MessageExt msg : msgs) { 
                    System.out.println(new String(msg.getBody()));
                }
                
                return ConsumeOrderlyStatus.SUCCESS; 
            } 
        });
        
        consumer.start(); 
    }
}

3. 事务消息

RocketMQ的事务消息,是指发送消息事件和其他事件需要同时成功或同时失败。比如银行转账,A银行的某账户要转一万元到B银行的某账户。A银行发送“B银行账户增加一万元”这个消息,要和“从A银行账户扣除一万元”这个操作同时成功或者同时失败。
RocketMQ采用两阶段提交的方式实现事务消息,TransactionMQProducer处理上面情况的流程是,先发一个“准备从B银行账户增加一万元”的消息,发送成功后做从A银行账户扣除一万元的操作,根据操作结果是否成功,确定之前的“准备从B银行账户增加一万元”的消息是做commit还是rollback,具
体流程如下:
1)发送方向RocketMQ发送“待确认”消息。
2)RocketMQ将收到的“待确认”消息持久化成功后,向发送方回复消息已经发送成功,此时第一阶段消息发送完成。
3)发送方开始执行本地事件逻辑。
4)发送方根据本地事件执行结果向RocketMQ发送二次确认(Commit或是Rollback)消息,RocketMQ收到Commit状态则将第一阶段消息标记为可投递,订阅方将能够收到该消息;收到Rollback状态则删除第一阶段的消息,订阅方接收不到该消息。
5)如果出现异常情况,步骤4)提交的二次确认最终未到达RocketMQ,服务器在经过固定时间段后将对“待确认”消息发起回查请求。
6)发送方收到消息回查请求后(如果发送一阶段消息的Producer不能工作,回查请求将被发送到和Producer在同一个Group里的其他Producer),通过检查对应消息的本地事件执行结果返回Commit或Roolback状态。
7)RocketMQ收到回查请求后,按照步骤4)的逻辑处理。
image.png
上面的逻辑似乎很好地实现了事务消息功能,它也是RocketMQ之前的版本实现事务消息的逻辑。但是因为RocketMQ依赖将数据顺序写到磁盘这个特征来提高性能,步骤4)却需要更改第一阶段消息的状态,这样会造成磁盘Catch的脏页过多,降低系统的性能。所以RocketMQ在4.x的版本中将这部分功能去除。系统中的一些上层Class都还在,用户可以根据实际需求实现自己的事务功能。

客户端有三个类来支持用户实现事务消息,第一个类是LocalTransaction-Executer,用来实例化步骤3)的逻辑,根据情况返回LocalTransactionState.ROLLBACK_MESSAGE或者LocalTransactionState.COMMIT_MESSAGE状态。第二个类是TransactionMQProducer,它的用法和DefaultMQProducer类似,要通过它启动一个Producer并发消息,但是比DefaultMQProducer多设置本地事务处理函数和回查状态函数。第三个类是TransactionCheckListener,实现步骤5)中MQ服务器的回查请求,返回LocalTransactionState.ROLLBACK_MESSAGE或者或者LocalTransactionState.COMMIT_MESSAGE

image.png

3.1 RocketMQ事务消息流程概要

 上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。

1.事务消息发送及提交:
(1) 发送消息(half消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
2.补偿流程:
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。

3.2 RocketMQ事务消息设计

1.事务消息在一阶段对用户不可见
在RocketMQ事务消息的主要流程中,一阶段的消息如何对用户不可见。其中,事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的。那么,如何做到写入消息但是对用户不可见呢?RocketMQ事务消息的做法是:如果消息是half消息,将备份原消息的主题与消息消费队列,然后改变主题为RMQ_SYS_TRANS_HALF_TOPIC。由于消费组未订阅该主题,故消费端无法消费half类型的消息。然后二阶段会显示执行提交或者回滚half消息(逻辑删除)。当然,为了防止二阶段操作失败,RocketMQ会开启一个定时任务,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。
在RocketMQ中,消息在服务端的存储结构如下,每条消息都会有对应的索引信息,Consumer通过ConsumeQueue这个二级索引来读取消息实体内容,其流程如下:

image.png
RocketMQ的具体实现策略是:写入的如果事务消息,对消息的Topic和Queue等属性进行替换,同时将原来的Topic和Queue信息存储到消息的属性中,正因为消息主题被替换,故消息并不会转发到该原主题的消息消费队列,消费者无法感知消息的存在,不会消费。其实改变消息主题是RocketMQ的常用“套路”,回想一下延时消息的实现机制。RMQ_SYS_TRANS_HALF_TOPIC

2.Commit和Rollback操作以及Op消息的引入
在完成一阶段写入一条对用户不可见的消息后,二阶段如果是Commit操作,则需要让消息对用户可见;如果是Rollback则需要撤销一阶段的消息。先说Rollback的情况。对于Rollback,本身一阶段的消息对用户是不可见的,其实不需要真正撤销消息(实际上RocketMQ也无法去真正的删除一条消息,因为是顺序写文件的)。但是区别于这条消息没有确定状态(Pending状态,事务悬而未决),需要一个操作来标识这条消息的最终状态。RocketMQ事务消息方案中引入了Op消息的概念,用Op消息标识事务消息已经确定的状态(Commit或者Rollback)。如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。引入Op消息后,事务消息无论是Commit或者Rollback都会记录一个Op操作。Commit相对于Rollback只是在写入Op消息前创建Half消息的索引。

3.Op消息的存储和对应关系
RocketMQ将Op消息写入到全局一个特定的Topic中通过源码中的方法—TransactionalMessageUtil.buildOpTopic();这个Topic是一个内部的Topic(像Half消息的Topic一样),不会被用户消费。Op消息的内容为对应的Half消息的存储的Offset,这样通过Op消息能索引到Half消息进行后续的回查操作。

4.Half消息的索引构建
在执行二阶段Commit操作时,需要构建出Half消息的索引。一阶段的Half消息由于是写到一个特殊的Topic,所以二阶段构建索引时需要读取出Half消息,并将Topic和Queue替换成真正的目标的Topic和Queue,之后通过一次普通消息的写入操作来生成一条对用户可见的消息。所以RocketMQ事务消息二阶段其实是利用了一阶段存储的消息的内容,在二阶段时恢复出一条完整的普通消息,然后走一遍消息写入流程。

5.如何处理二阶段失败的消息?
如果在RocketMQ事务消息的二阶段过程中失败了,例如在做Commit操作时,出现网络问题导致Commit失败,那么需要通过一定的策略使这条消息最终被Commit。RocketMQ采用了一种补偿机制,称为“回查”。Broker端对未确定状态的消息发起回查,将消息发送到对应的Producer端(同一个Group的Producer),由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback。Broker端通过对比Half消息和Op消息进行事务消息的回查并且推进CheckPoint(记录那些事务消息的状态是确定的)。
值得注意的是,rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,rocketmq默认回滚该消息。
image.png
事务消息:
TxProducer.java

package com.lagou.rocket.demo.producer; 

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState; 
import org.apache.rocketmq.client.producer.TransactionListener; 
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;


public class TxProducer {
    public static void main(String[] args) throws MQClientException { 
        TransactionListener listener = new TransactionListener() {
            @Override 
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                // 当发送事务消息prepare(half)成功后,调用该方法执行本地事务
                System.out.println("执行本地事务,参数为:" + arg); 
                try {
               		Thread.sleep(100000); 
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } 
                // return LocalTransactionState.ROLLBACK_MESSAGE;
                return LocalTransactionState.COMMIT_MESSAGE; 
            }
            
            @Override 
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                // 如果没有收到生产者发送的Half Message的响应,broker发送请求到生产 者回查生产者本地事务的状态
                // 该方法用于获取本地事务执行的状态。 
                System.out.println("检查本地事务的状态:" + msg);
                return LocalTransactionState.COMMIT_MESSAGE; 
                // return LocalTransactionState.ROLLBACK_MESSAGE; 
            }
        };
        
        TransactionMQProducer producer = new TransactionMQProducer("tx_producer_grp_08"); 
        producer.setTransactionListener(listener); 
        producer.setNamesrvAddr("node1:9876"); 
        producer.start();
        
        Message message = null; 
        message = new Message("tp_demo_08", "hello lagou-tx".getBytes());
        producer.sendMessageInTransaction(message, " {\"name\":\"zhangsan\"}");
    }

}

TxConsumer.java

package com.lagou.rocket.demo.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; 
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; 
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; 
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt; 
import java.util.List; 

public class TxConsumer { 
    public static void main(String[] args) throws MQClientException { 
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("txconsumer_grp_08_01");
        consumer.setNamesrvAddr("node1:9876"); 
        consumer.subscribe("tp_demo_08", "*");
        consumer.setMessageListener(new MessageListenerConcurrently() { 
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { 
                for (MessageExt msg : msgs) { 
                    System.out.println(new String(msg.getBody())); 
                }
                
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        
        consumer.start(); 
    }
}

4. 消息查询

区别于消息消费:先尝后买
尝就是消息查询
买:消息的消费
RocketMQ支持按照下面两种维度(“按照Message Id查询消息”、“按照Message Key查询消息”)进行消息查询。

4.1 按照MessageId查询消息

image.png
MsgId 总共 16 字节,包含消息存储主机地址(ip/port),消息 Commit Log offset。从 MsgId 中解析出 Broker 的地址和 Commit Log 的偏移地址,然后按照存储格式所在位置将消息 buffer 解析成一个完整的消息。

在RocketMQ中具体做法是:Client端从MessageId中解析出Broker的地址(IP地址和端口)和Commit Log的偏移地址后封装成一个RPC请求后,通过Remoting通信层发送(业务请求码:VIEW_MESSAGE_BY_ID)。Broker使用QueryMessageProcessor,使用请求中的 commitLog offset和 size 去 commitLog 中找到真正的记录并解析成一个完整的消息返回。

4.2 按照Message Key查询消息

“按照Message Key查询消息”,主要是基于RocketMQ的IndexFile索引文件来实现的。RocketMQ的索引文件逻辑结构,类似JDK中HashMap的实现。索引文件的具体结构如下:
image.png
1.根据查询的 key 的 hashcode%slotNum 得到具体的槽的位置(slotNum 是一个索引文件里面包含的最大槽的数目, 例如图中所示slotNum=5000000)。
2.根据 slotValue(slot 位置对应的值)查找到索引项列表的最后一项(倒序排列,slotValue 总是指向最新的一个索引项)。
3.遍历索引项列表返回查询时间范围内的结果集(默认一次最大返回的 32 条记录)
4.Hash 冲突;
第一种,key 的 hash 值不同但模数相同,此时查询的时候会再比较一次 key 的 hash 值(每个索引项保存了 key 的 hash 值),过滤掉 hash 值不相等的项。
第二种,hash 值相等但 key 不等, 出于性能的考虑冲突的检测放到客户端处理(key 的原始值是
存储在消息文件中的,避免对数据文件的解析), 客户端比较一次消息体的 key 是否相同。

5.存储;为了节省空间索引项中存储的时间是时间差值(存储时间-开始时间,开始时间存储在索引文件头中), 整个索引文件是定长的,结构也是固定的。
API的使用:

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; 
import org.apache.rocketmq.client.exception.MQBrokerException; 
import org.apache.rocketmq.client.exception.MQClientException; 
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException; 

public class QueryingMessageDemo { 
    public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("consumer_grp_09_01");
        consumer.setNamesrvAddr("node1:9876");
        consumer.start();
        MessageExt message = consumer.viewMessage("tp_demo_08", "0A4E00A7178878308DB150A780BB0000"); 
        
        System.out.println(message); 
        System.out.println(message.getMsgId()); 
        consumer.shutdown(); 
    } 

}

5. 消息优先级

有些场景,需要应用程序处理几种类型的消息,不同消息的优先级不同。RocketMQ是个先入先出的队列,不支持消息级别或者Topic级别的优先级。业务中简单的优先级需求,可以通过间接的方式解决,下面列举三种优先级相关需求的具体处理方法。
第一种
多个不同的消息类型使用同一个topic时,由于某一个种消息流量非常大,导致其他类型的消息无法及时消费,造成不公平,所以把流量大的类型消息在一个单独的 Topic,其他类型消息在另外一个Topic,应用程序创建两个 Consumer,分别订阅不同的 Topic,这样就可以了。

第二种
情况和第一种情况类似,但是不用创建大量的 Topic。举个实际应用场景: 一个订单处理系统,接收从 100家快递门店过来的请求,把这些请求通过 Producer 写入RocketMQ;订单处理程序通过Consumer 从队列里读取消 息并处理,每天最多处理 1 万单 。 如果这 100 个快递门店中某几个门店订单量大增,比如门店一接了个大客户,一个上午就发出 2万单消息请求,这样其他 的 99 家门店可能被迫等待门店一的 2 万单处理完,也就是两天后订单才能被处理,显然很不公平 。

这时可以创建 一 个 Topic, 设置 Topic 的 MessageQueue 数 量 超过 100 个,Producer根据订单的门店号,把每个门店的订单写人 一 个 MessageQueue。 DefaultMQPushConsumer默认是采用循环的方式逐个读取一个 Topic 的所有 MessageQueue,这样如果某家门店订单量大增,这家门店对应的 MessageQueue 消息数增多,等待时间增长,但不会造成其他家门店等待时间增长。DefaultMQPushConsumer 默认的 pullBatchSize 是 32,也就是每次从某个 MessageQueue 读取消息的时候,最多可以读 32 个 。 在上面的场景中,为了更 加公平,可以把 pullBatchSize 设置成1。

第三种
强制优先级
TypeA、 TypeB、 TypeC 三类消息 。 TypeA 处于第一优先级,要确保只要有TypeA消息,必须优先处理; TypeB处于第二优先 级; TypeC 处于第三优先级 。 对这种要求,或者逻辑更复杂的要求,就要用 户自己编码实现优先级控制,如果上述的 三 类消息在一个 Topic 里,可以使 用 PullConsumer,自
主控制 MessageQueue 的遍历,以及消息的读取;如果上述三类消息在三个 Topic下,需要启动三个Consumer, 实现逻辑控制三个 Consumer 的消费 。

6. 底层网络通信 - Netty高性能之道

RocketMQ底层通信的实现是在Remoting模块里,因为借助了Netty而没有重复造轮子,RocketMQ的通信部分没有很多的代码,就是用Netty实现了一个自定义协议的客户端/服务器程序。

  1. 自定义ByteBuf可以从底层解决ByteBuffer的一些问题,并且通过“内存池”的设计来提升性能
  2. Reactor主从多线程模型
  3. 充分利用了零拷贝,CAS/volatite高效并发编程特性
  4. 无锁串行化设计
  5. 管道责任链的编程模型
  6. 高性能序列化框架的支持
  7. 灵活配置TCP协议参数

RocketMQ消息队列集群主要包括NameServer、Broker(Master/Slave)、Producer、Consumer4个角色,基本通讯流程如下:
(1) Broker启动后需要完成一次将自己注册至NameServer的操作;随后每隔30s时间定时向NameServer上报Topic路由信息。
(2) 消息生产者Producer作为客户端发送消息时候,需要根据消息的Topic从本地缓存的TopicPublishInfoTable获取路由信息。如果没有则更新路由信息会从NameServer上重新拉取,同时Producer会默认每隔30s向NameServer拉取一次路由信息。
(3) 消息生产者Producer根据2)中获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker作为消息的接收者接收消息并落盘存储。
(4) 消息消费者Consumer根据2)中获取的路由信息,并再完成客户端的负载均衡后,选择其中的某一个或者某几个消息队列来拉取消息并进行消费。

从上面1)~3)中可以看出在消息生产者, Broker和NameServer之间都会发生通信(这里只说了MQ的部分通信),因此如何设计一个良好的网络通信模块在MQ中至关重要,它将决定RocketMQ集群整体的消息传输能力与最终的性能。

rocketmq-remoting 模块是 RocketMQ消息队列中负责网络通信的模块,它几乎被其他所有需要网络通信的模块(诸如rocketmq-client、rocketmq-broker、rocketmq-namesrv)所依赖和引用。为了实现客户端与服务器之间高效的数据请求与接收,RocketMQ消息队列自定义了通信协议并在Netty的基础之上扩展了通信模块。

RocketMQ中惯用的套路:
请求报文和响应都使用RemotingCommand,然后在Processor处理器中根据RequestCode请求码来匹配对应的处理方法。
处理器通常继承至NettyRequestProcessor,使用前需要先注册才行,注册方式remotingServer.registerDefaultProcessor

网络通信核心的东西无非是:
线程模型
私有协议定义
编解码器
序列化/反序列化

既然是基于Netty的网络通信,当然少不了一堆自定义实现的Handler,例如继承至:SimpleChannelInboundHandler ChannelDuplexHandler

6.1 Remoting通信类结构

image.png

6.2 协议设计与编解码

在Client和Server之间完成一次消息发送时,需要对发送的消息进行一个协议约定,因此就有必要自定义RocketMQ的消息协议。同时,为了高效地在网络中传输消息和对收到的消息读取,就需要对消息进行编解码。在RocketMQ中,RemotingCommand这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作。
image.png
image.png
可见传输内容主要可以分为以下4部分:
(1) 消息长度:总长度,四个字节存储,占用一个int类型;
(2) 序列化类型&消息头长度:同样占用一个int类型,第一个字节表示序列化类型,后面三个字节表示消息头长度;
(3) 消息头数据:经过序列化后的消息头数据;
(4) 消息主体数据:消息主体的二进制字节数据内容;

6.3 消息的通信方式和流程

在RocketMQ消息队列中支持通信的方式主要有同步(sync)、异步(async)、单向(oneway) 三种。其中“单向”通信模式相对简单,一般用在发送心跳包场景下,无需关注其Response。这里,主要介绍RocketMQ的异步通信流程。
image.png

6.4 Reactor主从多线程模型

RocketMQ的RPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。
image.png
上面的框图中可以大致了解RocketMQ中NettyRemotingServer的Reactor 多线程模型。
一个 Reactor 主线程(eventLoopGroupBoss,即为上面的1)负责监听 TCP网络连接请求,建立好连接,创建SocketChannel,并注册到selector上。
RocketMQ的源码中会自动根据OS的类型选择NIO和Epoll,也可以通过参数配置),然后监听真正的网络数据。
拿到网络数据后,再丢给Worker线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),在真正执行业务逻辑之前需要进行SSL验证、编解码、空闲检查、网络连接管理,这些工作交给defaultEventExecutorGroup(即为上面的“M1”,源码中默认设置为8)去做。
处理业务操作放在业务线程池中执行,根据 RomotingCommand 的业务请求码code去processorTable这个本地缓存变量中找到对应的 processor,然后封装成task任务后,提交给对应的业务processor处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的 “M2”)。
从入口到业务逻辑的几个步骤中线程池一直再增加,这跟每一步逻辑复杂性相关,越复杂,需要的并发通道越宽。
image.png

7. 限流

RocketMQ消费端中我们可以:

  1. 设置最大消费线程数
  2. 每次拉取消息条数等
    同时:
  3. PushConsumer会判断获取但还未处理的消息个数、消息总大小、Offset的跨度,
  4. 任何一个值超过设定的大小就隔一段时间再拉取消息,从而达到流量控制的目的。

在 Apache RocketMQ 中,当消费者去消费消息的时候,无论是通过 pull 的方式还是 push 的方式,都可能会出现大批量的消息突刺。如果此时要处理所有消息,很可能会导致系统负载过高,影响稳定性。但其实可能后面几秒之内都没有消息投递,若直接把多余的消息丢掉则没有充分利用系统处理消息的能力。我们希望可以把消息突刺均摊到一段时间内,让系统负载保持在消息处理水位之下的同时尽可能地处理更多消息,从而起到“削峰填谷”的效果:
image.png
上图中红色的部分代表超出消息处理能力的部分。我们可以看到消息突刺往往都是瞬时的、不规律的,其后一段时间系统往往都会有空闲资源。我们希望把红色的那部分消息平摊到后面空闲时去处理,这样既可以保证系统负载处在一个稳定的水位,又可以尽可能地处理更多消息。

7.1 Sentinel 介绍

Sentinel 是阿里中间件团队开源的,面向分布式服务架构的轻量级流量控制产品,主要以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度来帮助用户保护服务的稳定性。

7.2 Sentinel原理

Sentinel 专门为这种场景提供了匀速器的特性,可以把突然到来的大量请求以匀速的形式均摊,以固定的间隔时间让请求通过,以稳定的速度逐步处理这些请求,起到“削峰填谷”的效果,从而避免流量突刺造成系统负载过高。同时堆积的请求将会排队,逐步进行处理;当请求排队预计超过最大超时时长的时候则直接拒绝,而不是拒绝全部请求。
比如在 RocketMQ 的场景下配置了匀速模式下请求 QPS 为 5,则会每 200 ms 处理一条消息,多余的处理任务将排队;同时设置了超时时间为 5 s,预计排队时长超过 5s 的处理任务将会直接被拒绝
示意图如下图所示:
image.png
RocketMQ 用户可以根据不同的 group 和不同的 topic 分别设置限流规则,限流控制模式设置为匀速器模式(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER),比如:

private void initFlowControlRule() { 
    FlowRule rule = new FlowRule();
    rule.setResource(KEY); 
    // 对应的 key 为 groupName:topicName 
    rule.setCount(5);
    
    rule.setGrade(RuleConstant.FLOW_GRADE_QPS); 
    rule.setLimitApp("default");
    
    // 匀速器模式下,设置了 QPS 为 5,则请求每 200 ms 允许通过 1 个 
    rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER);
    
    // 如果更多的请求到达,这些请求会被置于虚拟的等待队列中。等待队列有一个 max timeout, 如果请求预计的等待时间超过这个时间会直接被 block 
    // 在这里,timeout 为 5s 
    rule.setMaxQueueingTimeMs(5 * 1000);
    FlowRuleManager.loadRules(Collections.singletonList(rule));
}

参考:
https://github.com/alibaba/Sentinel/wiki/Sentinel-%E4%B8%BA-RocketMQ-%E4%BF%9D%E9%A9%BE%E6%8A%A4%E8%88%AA

import com.alibaba.csp.sentinel.Constants;
import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.EntryType; 
import com.alibaba.csp.sentinel.SphU; 
import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; 
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; 
import org.apache.rocketmq.client.consumer.PullResult; 
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt; 
import org.apache.rocketmq.common.message.MessageQueue; 
import java.util.Collections;
import java.util.HashMap; 
import java.util.Map; 
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong; 


public class PullDemo {
    
    private static final String GROUP_NAME = "consumer_grp_13_05";
    private static final String TOPIC_NAME = "tp_demo_13"; 
    private static final String KEY = String.format("%s:%s", GROUP_NAME, TOPIC_NAME); 
    //使用map存放每个messagequeue的消息偏移量
    private static final Map<MessageQueue, Long> OFFSET_TABLE = new HashMap<MessageQueue, Long>();
    
    @SuppressWarnings("PMD.ThreadPoolCreationRule")
    private static final ExecutorService pool = Executors.newFixedThreadPool(32);
    private static final AtomicLong SUCCESS_COUNT = new AtomicLong(0);
    private static final AtomicLong FAIL_COUNT = new AtomicLong(0); 
    
    public static void main(String[] args) throws MQClientException {
        // 初始化哨兵的流控 
        initFlowControlRule();
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(GROUP_NAME); 
        consumer.setNamesrvAddr("node1:9876");
        consumer.start(); 
        
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(TOPIC_NAME); 
        for (MessageQueue mq : mqs) { 
            System.out.printf("Consuming messages from the queue: %s%n", mq);
            
            SINGLE_MQ: 
            while (true) { 
                try {
                    PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); 
                    if (pullResult.getMsgFoundList() != null) {
                        for (MessageExt msg : pullResult.getMsgFoundList()) { 
                            doSomething(msg); 
                        }
                    }
                    
                    
                    long nextOffset = pullResult.getNextBeginOffset();
                    // 将每个mq对应的偏移量记录在本地HashMap中 
                    putMessageQueueOffset(mq, nextOffset); 
                    consumer.updateConsumeOffset(mq, nextOffset); 
                    
                    switch (pullResult.getPullStatus()) {
                        case NO_NEW_MSG: 
                            break SINGLE_MQ;
                        case FOUND: 
                        case NO_MATCHED_MSG: 
                        case OFFSET_ILLEGAL:
                        default: 
                            break;
                    } 
                } catch (Exception e) { 
                    e.printStackTrace();
                } 
            }
        }
        
        consumer.shutdown(); 
    }
    
    /*** 对每个收到的消息使用一个线程提交任务 
    * @param message 
    */
    private static void doSomething(MessageExt message) { 
        pool.submit(() -> {
            Entry entry = null;
            try {
                ContextUtil.enter(KEY);
                entry = SphU.entry(KEY, EntryType.OUT); 
                // 在这里处理业务逻辑,此处只是打印
                System.out.printf("[%d][%s][Success: %d] Receive New Messages: %s %n", System.currentTimeMillis(), Thread.currentThread().getName(), SUCCESS_COUNT.addAndGet(1), new String(message.getBody())); 
            } catch (BlockException ex) { 
                // Blocked. 
                System.out.println("Blocked: " + FAIL_COUNT.addAndGet(1)); 
            }
            finally { 
                if (entry != null) { 
                    entry.exit(); 
                }
                
                ContextUtil.exit();
            } 
        }); 
    }
    
    
    private static void initFlowControlRule() { 
        FlowRule rule = new FlowRule(); 
        // 消费组名称:主题名称 字符串
        rule.setResource(KEY); 
        
        // 根据QPS进行流控 
        rule.setGrade(RuleConstant.FLOW_GRADE_QPS); 
        
        // 1表示QPS为1,请求间隔1000ms。 
        // 如果是5,则表示每秒5个消息,请求间隔200ms 
        rule.setCount(1); 
        rule.setLimitApp("default"); 
        
        // 调用使用固定间隔。如果qps为1,则请求之间间隔为1s
        rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER); 
        // 如果请求太多,就将这些请求放到等待队列中 
        // 该队列有超时时间。如果等待队列中请求超时,则丢弃 
        // 此处设置超时时间为5s 
        rule.setMaxQueueingTimeMs(5 * 1000); 
        FlowRuleManager.loadRules(Collections.singletonList(rule)); 
    }
    
    // 获取指定MQ的偏移量 
    private static long getMessageQueueOffset(MessageQueue mq) { 
        Long offset = OFFSET_TABLE.get(mq); 
        if (offset != null) { 
            return offset; 
        }
        
        return 0;
    }
    
    // 在本地HashMap中记录偏移量 
    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        OFFSET_TABLE.put(mq, offset); 
    }
}