RocketMQ源码分析
1. 环境搭建
依赖工具
JDK :1.8+
Maven
IntelliJ IDEA
1.1 源码拉取
从官方仓库 https://github.com/apache/rocketmq clone 或者 download 源码。
源码目录结构:
- broker: broker 模块(broke 启动进程)
- client :消息客户端,包含消息生产者、消息消费者相关类
- common :公共包
- dev :开发者信息(非源代码)
- distribution :部署实例文件夹(非源代码)
- example: RocketMQ 例代码
- filter :消息过滤相关基础类
- filtersrv:消息过滤服务器实现相关类(Filter启动进程)
- logappender:日志实现相关类
- namesrv:NameServer实现相关类(NameServer启动进程)
- openmessageing:消息开放标准
- remoting:远程通信模块,基于Netty
- srcutil:服务工具类
- store:消息存储实现相关类
- style:checkstyle相关实现
- test:测试相关类
- tools:工具类,监控命令相关实现类
1.2 导入IDEA
执行安装
clean install -Dmaven.test.skip=true
1.3 调试
创建 conf 配置文件夹,从 distribution 拷贝 broker.conf 和 logback_broker.xml 和 logback_namesrv.xml
1.4 启动NameServer
配置NameServer的启动环境:
NameServer启动的类就是这里的 NamesrvStartup 。
添加NameServer的启动环境:
启动NameServer:
控制台打印结果
The Name Server boot success. serializeType=JSON
1.5启动Broker
在项目根目录下创建数据文件夹 dataDir
从rocketmq-distribution模块的conf目录拷贝三个文件:
broker.conf文件内容修改:
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# namesrvAddr地址
namesrvAddr=127.0.0.1:9876
# 启用自动创建主题
autoCreateTopicEnable=true
# 存储路径
storePathRootDir=E:\\workspace\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir
# commitLog路径
storePathCommitLog=E:\\workspace\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir\\commitlog
# 消息队列存储路径
storePathConsumeQueue=E:\\workspace\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir\\consumequeue
# 消息索引存储路径
storePathIndex=E:\\workspace\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir\\index
# checkpoint文件路径
storeCheckpoint=E:\\workspace\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir\\checkpoint
# abort文件存储路径
abortFile=E:\\workspace\\source-code\\rocketmq-rocketmq-all-4.5.1\\dataDir\\abort
启动 BrokerStartup ,配置 broker.conf 和 ROCKETMQ_HOME
现在下图中已经存在NameServer或Broker的启动环境了,如果没有,可以点击左上角的”➕”,选择创建Application环境,在Name一栏修改名字。
1.6发送消息
进入example模块的 org.apache.rocketmq.example.quickstart
指定Namesrv地址
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876"); 12
运行 main 方法,发送消息
1.7消费消息
进入example模块的 org.apache.rocketmq.example.quickstart
指定Namesrv地址
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("127.0.0.1:9876");
运行 main 方法,消费消息
2. NameServer
2.1 架构设计
消息中间件的设计思路一般是基于主题订阅发布的机制。
生产者(Producer)发送消息到消息服务器的某个主题,消息服务器负责将消息持久化存储。
消息消费者(Consumer)订阅该兴趣的主题,消息服务器根据订阅信息(路由信息)将消息推送 到消费者(Push模式)或者消费者主动向消息服务器拉去(Pull模式),从而实现消息生产者与消息消费者解耦。 为了避免消息服务器的单点故障,部署多台消息服务器共同承担消息的存储。
生产者如何知道消息要发送到哪台消息服务器呢?
如果某一台消息服务器宕机了,生产者如何在不重启服务情况下感知呢?
NameServer就是为了解决以上问题设计的。
Broker消息服务器在启动时向所有NameServer注册,生产者(Producer)在发送消息之前先从NameServer获取Broker服务器地址列表,然后根据负载均衡算法从列表中选择一台服务器进行发送。 NameServer与每台Broker保持长连接,并间隔30S检测Broker是否存活,如果检测到Broker宕机,则从路由注册表中删除。
但是路由变化不会马上通知生产者。这样设计的目的是为了降低NameServer实现的复杂度,在消息发送端提供容错机制保证消息发送的可用性。
NameServer本身的高可用是通过部署多台NameServer来实现,但彼此之间不通讯,也就是NameServer服务器之间在某一个时刻的数据并不完全相同,但这对消息发送并不会造成任何影响,这 也是NameServer设计的一个亮点,总之,RocketMQ设计追求简单高效。
2.2 启动流程
启动类: org.apache.rocketmq.namesrv.NamesrvStartup
*
步骤一
解析配置文件,填充NameServerConfig、NettyServerConfig属性值,并创建NamesrvController
代码:NamesrvController#createNamesrvController
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
//rocketmq.remoting.version 4.5.1 指定当前版本 生产者和消费者客户端都需要与服务端版本一致
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
//创建NamesrvConfig 与NameServer相关的配置参数对象
final NamesrvConfig namesrvConfig = new NamesrvConfig();
//与netty相关的配置参数封装对象
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
//指定Netty服务器监听的端口为9876
nettyServerConfig.setListenPort(9876);
//如果命令行参数中有c 则表示指定了nameserver的配置文件
//需要解析加载其中的配置项
if (commandLine.hasOption('c')) {
//文件路径 类似于 mqnamesrv -c /opt/rocketmq/conf/namesrv.conf
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
//将配置文件的配置信息封装到properties中
properties.load(in);
//将properties对象中包含的nameserver有关的配置封装到NamesrvConfig
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
// -p 表示打印配置信息
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
//将命令行中的参数封装到namesrvConfig中
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
//如果找不到ROCKETMQ_HOME环境变量。则退出
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard 缓存所有的配置信息到properties 防止controller对象销毁 配置信息没有了
controller.getConfiguration().registerConfig(properties);
return controller;
}
NamesrvConfig属性
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
private String productEnvName = "center";
private boolean clusterTest = false;
private boolean orderMessageEnable = false;
rocketmqHome:rocketmq主目录
kvConfig:NameServer存储KV配置属性的持久化路径
configStorePath:nameServer默认配置文件路径
orderMessageEnable:是否支持顺序消息
NettyServerConfig属性
private int listenPort = 8888;
private int serverWorkerThreads = 8;
private int serverCallbackExecutorThreads = 0;
private int serverSelectorThreads = 3;
private int serverOnewaySemaphoreValue = 256;
private int serverAsyncSemaphoreValue = 64;
private int serverChannelMaxIdleTimeSeconds = 120;
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
private boolean serverPooledByteBufAllocatorEnable = true;
/**
* make make install
*
*
* ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \
* --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd
*/
private boolean useEpollNativeSelector = false;
listenPort:NameServer监听端口,该值默认会被初始化为9876
serverWorkerThreads: Netty业务线程池线程个数
serverCallbackExecutorThreads:Netty public任务线程池线程个数, Netty网络设计,根据业务类型会创建不同的线程池,比如处理消息发送、消息消费、心跳检测等。如果该业务类型未注册线程池,则由public线程池执行。 serverSelectorThreads:IO线程池个数,主要是NameServer、Broker端解析请求、返回相应的线程个数,这类线程主要是处理网路请求的,解析请求包,然后转发到各个业务线程池完成具体的操作,然后将结果返回给调用方;
serverOnewaySemaphoreValue:send oneway消息请求并发读(Broker端参数);
serverAsyncSemaphoreValue:异步消息发送最大并发度;
serverChannelMaxIdleTimeSeconds :网络连接最大的空闲时间,默认120s。
serverSocketSndBufSize:网络socket发送缓冲区大小。
serverSocketRcvBufSize: 网络接收端缓存区大小。
serverPooledByteBufAllocatorEnable: ByteBuffer是否开启缓存;
useEpollNativeSelector:是否启用Epoll IO模型。
步骤二
根据启动属性创建NamesrvController实例,并初始化该实例。NameServerController实例为 NameServer核心控制器
代码:NamesrvController#initialize
public boolean initialize() {
//加载配置参数
this.kvConfigManager.load();
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
//创建固定线程池 用于处理用户请求
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor();
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
//扫描不活跃的broker,从元数据删除
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
//每10秒执行一次
}, 5, 10, TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
//每10秒打印所有的k-v配置信息
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
return true;
}
步骤三
在JVM进程关闭之前,先将线程池关闭,及时释放资源
代码:NamesrvStartup#start
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
//首先初始化controller
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
//添加关闭的钩子函数
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
//释放资源 关闭nettyserver 线程池等。。
controller.shutdown();
return null;
}
}));
//启动controller
controller.start();
return controller;
}
2.3 路由管理
NameServer的主要作用是为消息的生产者和消息消费者提供关于主题Topic的路由信息,那么NameServer需要存储路由的基础信息,还要管理Broker节点,包括路由注册、路由删除等。
2.3.1 路由元信息
代码:RouteInfoManager
//对应于每个主题的mq列表
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
//对应每个broker的位置
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
//跟集群相关的地址列表
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
topicQueueTable:Topic消息队列路由信息,消息发送时根据路由表进行负载均衡
brokerAddrTable:Broker基础信息,包括brokerName、所属集群名称、主备Broker地址
clusterAddrTable:Broker集群信息,存储集群中所有Broker名称
brokerLiveTable:Broker状态信息,NameServer每次收到心跳包是会替换该信息
filterServerTable:Broker上的FilterServer列表,用于类模式消息过滤。
RocketMQ基于订阅发布机制,一个Topic拥有多个消息队列,一个Broker为默认每一个主题创建4个读队列和4个写队列。多个Broker组成一个集群,
集群由相同的多台Broker组成Master-Slave架构,brokerId为0代表Master,大于0为Slave。BrokerLiveInfo中的lastUpdateTimestamp存储上次收到Broker心跳包的时间。
rocketmq中的读写队列
在创建或更改topic时,需要配置writeQueueNums和readQueueNums数,这里的读写队列有什么作用?
初识rocketmq的童鞋,很容易把读写队列和读写分离混淆在一起。其实在rocketmq里是完全不同的两个概念。读写分离,是用HA机制,将一个节点的数据同步到另外一个节点,
主节点多用于写(也可读),从节点只用于读。往往一主多从,通过读写分离减轻系统压力。
读写队列,则是在做路由信息时使用。在消息发送时,使用写队列个数返回路由信息,而消息消费时按照读队列个数返回路由信息。在物理文件层面,
只有写队列才会创建文件。举个例子:写队列个数是8,设置的读队列个数是4.这个时候,会创建8个文件夹,代表0 1 2 3 4 5 6 7,但在消息消费时,
路由信息只返回4,在具体拉取消息时,就只会消费0 1 2 3这4个队列中的消息,4 5 6 7中的信息压根就不会被消费。
反过来,如果写队列个数是4,读队列个数是8,在生产消息时只会往0 1 2 3中生产消息,消费消息时则会从0 1 2 3 4 5 6 7所有的队列中消费,
当然 4 5 6 7中压根就没有消息 ,假设消费group有两个消费者,事实上只有第一个消费者在真正的消费消息(0 1 2 3),第二个消费者压根就消费不到消息。
由此可见,只有readQueueNums>=writeQueueNums,程序才能正常进行。最佳实践是readQueueNums=writeQueueNums。
那rocketmq为什么要区分读写队列呢?直接强制readQueueNums=writeQueueNums,不就没有问题了吗?
rocketmq设置读写队列数的目的在于方便队列的缩容和扩容。思考一个问题,一个topic在每个broker上创建了128个队列,现在需要将队列缩容到64个,
怎么做才能100%不会丢失消息,并且无需重启应用程序?
最佳实践:
先缩容写队列128->64,写队列由0 1 2 ......127缩至 0 1 2 ........63。等到64 65 66......127中的消息全部消费完后,
再缩容读队列128->64.(同时缩容写队列和读队列可能会导致部分消息未被消费)
2.3.2 路由注册
1)发送心跳包
RocketMQ路由注册是通过Broker与NameServer的心跳功能实现的。Broker启动时向集群中所有的NameServer发送心跳信息,每隔30s向集群中所有NameServer发送心跳包,NameServer收到心跳包时会更新brokerLiveTable缓存中BrokerLiveInfo的lastUpdataTimeStamp信息,然后NameServer每隔10s扫描brokerLiveTable,如果连续120S没有收到心跳包,NameServer将移除Broker的路由信息同时关闭Socket连接。
代码:BrokerController#start
//想NameSrv注册broker信息
this.registerBrokerAll(true, false, true);
//每隔30s上报Broker信息到NameServer
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
代码:BrokerOuterAPI#registerBrokerAll
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
final boolean compressed) {
final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
//获得nameServer地址信息
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
//封装请求头
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
//消息是否压缩
requestHeader.setCompressed(compressed);
//封装请求体
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
//遍历所有nameserver列表
for (final String namesrvAddr : nameServerAddressList) {
//使用线程池并行注册
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
//分别向NameServer注册
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
if (result != null) {
registerBrokerResultList.add(result);
}
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}
try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
return registerBrokerResultList;
}
代码:BrokerOutAPI#registerBroker
if (oneway) {
try {
//通过nettyClient发送单向消息 进行注册 不用等待响应
this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
} catch (RemotingTooMuchRequestException e) {
// Ignore
}
return null;
}
//如果不是单向请求 则同步等待nameserver的响应
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
assert response != null;
2)处理心跳包
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor 网路处理类解析请求类型,如果请求类型是为REGISTER_BROKER,则将请求转发到 RouteInfoManager#regiesterBroker
代码:DefaultRequestProcessor#processRequest
//判断是注册broker
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
//注册broker
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
代码:DefaultRequestProcessor#registerBroker
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
topicConfigWrapper,
null,
ctx.channel()
);
代码:RouteInfoManager#registerBroker
维护路由信息
//都是hashmap 需要加锁
this.lock.writeLock().lockInterruptibly();
//维护clusterAddrTable
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
//第一次注册
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
//都是hashmap 需要加锁
this.lock.writeLock().lockInterruptibly();
//维护clusterAddrTable
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
//第一次注册
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
boolean registerFirst = false;
//维护brokerAddrTable
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
//第一次注册,则创建brokerData
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
//非第一次注册,更新Broker
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
//新加的broker的与本地的同名broker的id不同
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
it.remove();
}
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
//是否是第一次注册
registerFirst = registerFirst || (null == oldAddr);
//维护topicQueueTable
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
//更新broker和主题的配置关系
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
代码:RouteInfoManager#createAndUpdateQueueData
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
//创建queueData
QueueData queueData = new QueueData();
queueData.setBrokerName(brokerName);
queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
queueData.setReadQueueNums(topicConfig.getReadQueueNums());
queueData.setPerm(topicConfig.getPerm());
queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());
//获得topicQueueTable中队列集合
List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
//主题没有注册过queue topicQueueTable为空,则直接添加queueData到队列集合
if (null == queueDataList) {
queueDataList = new LinkedList<QueueData>();
queueDataList.add(queueData);
this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
} else {
//判断是否是新的队列
boolean addNewOne = true;
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
////如果brokerName相同,代表不是新的队列 判断队列数据是否相同
if (qd.getBrokerName().equals(brokerName)) {
if (qd.equals(queueData)) {
addNewOne = false;
} else {
log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,
queueData);
it.remove();
}
}
}
//如果是新的队列,则添加队列到queueDataList
if (addNewOne) {
queueDataList.add(queueData);
}
}
}
代码:RouteInfoManager#registerBroker
//维护brokerLiveTable
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
//维护filterServerList
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
2.3.3 路由删除
Broker 每隔30s向 NameServer 发送一个心跳包,心跳包包含 BrokerId , Broker 地址, Broker 名称, Broker 所属集群名称、 Broker 关联的 FilterServer 列表。但是如果 Broker 宕机, NameServer 无法收到心跳包,此时 NameServer 如何来剔除这些失效的 Broker 呢? NameServer 会每隔10s扫描 brokerLiveTable 状态表,如果 BrokerLive 的lastUpdateTimestamp的时间戳距当前时间超过120s,则认为 Broker 失效,移除该 Broker ,关闭与 Broker 连接,同时更新topicQueueTable 、 brokerAddrTable 、 brokerLiveTable 、 filterServerTable 。
RocketMQ有两个触发点来删除路由信息:
- NameServer定期扫描brokerLiveTable检测上次心跳包与当前系统的时间差,如果时间超过120s,则需要移除broker。
- Broker在正常关闭的情况下,会执行unregisterBroker指令
这两种方式路由删除的方法都是一样的,就是从相关路由表中删除与该broker相关的信息。
代码:NamesrvController#initialize
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
//扫描不活跃的broker,从元数据删除
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
//每10秒执行一次
}, 5, 10, TimeUnit.SECONDS);
代码:RouteInfoManager#scanNotActiveBroker
public void scanNotActiveBroker() {
//获得brokerLiveTable
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
//遍历brokerLiveTable
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
//如果收到心跳包的时间距当时时间是否超过120s
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
//关闭连接
RemotingUtil.closeChannel(next.getValue().getChannel());
//移除broker
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
//维护路由表
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
代码:RouteInfoManager#onChannelDestroy
public void onChannelDestroy(String remoteAddr, Channel channel) {
String brokerAddrFound = null;
if (channel != null) {
try {
try {
this.lock.readLock().lockInterruptibly();
Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
this.brokerLiveTable.entrySet().iterator();
while (itBrokerLiveTable.hasNext()) {
Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
if (entry.getValue().getChannel() == channel) {
brokerAddrFound = entry.getKey();
break;
}
}
} finally {
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("onChannelDestroy Exception", e);
}
}
if (null == brokerAddrFound) {
brokerAddrFound = remoteAddr;
} else {
log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
}
if (brokerAddrFound != null && brokerAddrFound.length() > 0) {
try {
try {
//申请写锁,根据brokerAddress从brokerLiveTable和filterServerTable移除
this.lock.writeLock().lockInterruptibly();
this.brokerLiveTable.remove(brokerAddrFound);
this.filterServerTable.remove(brokerAddrFound);
//维护brokerAddrTable
String brokerNameFound = null;
boolean removeBrokerName = false;
Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
this.brokerAddrTable.entrySet().iterator();
//遍历brokerAddrTable
while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
BrokerData brokerData = itBrokerAddrTable.next().getValue();
//遍历broker地址
Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> entry = it.next();
Long brokerId = entry.getKey();
String brokerAddr = entry.getValue();
//根据broker地址移除brokerAddr
if (brokerAddr.equals(brokerAddrFound)) {
brokerNameFound = brokerData.getBrokerName();
it.remove();
log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
brokerId, brokerAddr);
break;
}
}
//如果当前主题只包含待移除的broker,则移除该topic
if (brokerData.getBrokerAddrs().isEmpty()) {
removeBrokerName = true;
itBrokerAddrTable.remove();
log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
brokerData.getBrokerName());
}
}
//维护clusterAddrTable
if (brokerNameFound != null && removeBrokerName) {
Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
//遍历clusterAddrTable
while (it.hasNext()) {
Entry<String, Set<String>> entry = it.next();
//获得集群名称
String clusterName = entry.getKey();
//获得集群中brokerName集合
Set<String> brokerNames = entry.getValue();
//从brokerNames中移除brokerNameFound
boolean removed = brokerNames.remove(brokerNameFound);
if (removed) {
log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
brokerNameFound, clusterName);
if (brokerNames.isEmpty()) {
log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
clusterName);
//如果集群中不包含任何broker,则移除该集群
it.remove();
}
break;
}
}
}
//维护topicQueueTable队列
if (removeBrokerName) {
//遍历topicQueueTable
Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
this.topicQueueTable.entrySet().iterator();
while (itTopicQueueTable.hasNext()) {
Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
//主题名称
String topic = entry.getKey();
//队列集合
List<QueueData> queueDataList = entry.getValue();
//遍历该主题队列
Iterator<QueueData> itQueueData = queueDataList.iterator();
while (itQueueData.hasNext()) {
//从队列中移除为活跃broker信息
QueueData queueData = itQueueData.next();
if (queueData.getBrokerName().equals(brokerNameFound)) {
itQueueData.remove();
log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
topic, queueData);
}
}
//如果该topic的队列为空,则移除该topic
if (queueDataList.isEmpty()) {
itTopicQueueTable.remove();
log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
topic);
}
}
}
} finally {
//释放写锁
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("onChannelDestroy Exception", e);
}
}
}
2.3.4 路由发现
RocketMQ路由发现是非实时的,当Topic路由出现变化后,NameServer不会主动推送给客户端, 而是由客户端定时拉取主题最新的路由。
代码:DefaultRequestProcessor#getRouteInfoByTopic
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
//调用RouteInfoManager的方法,从路由表topicQueueTable、brokerAddrTable、
// filterServerTable中分别填充TopicRouteData的List<QueueData>、List<BrokerData>、 filterServer
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
//如果找到主题对应你的路由信息并且该主题为顺序消息,则从NameServer KVConfig中获取 关于顺序消息相关的配置填充路由信息
if (topicRouteData != null) {
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
}
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
代码:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#pickupTopicRouteData
public TopicRouteData pickupTopicRouteData(final String topic) {
TopicRouteData topicRouteData = new TopicRouteData();
boolean foundQueueData = false;
boolean foundBrokerData = false;
Set<String> brokerNameSet = new HashSet<String>();
List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
topicRouteData.setBrokerDatas(brokerDataList);
HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
topicRouteData.setFilterServerTable(filterServerMap);
try {
try {
this.lock.readLock().lockInterruptibly();
//通过主题 获取所有的messagequeue
List<QueueData> queueDataList = this.topicQueueTable.get(topic);
if (queueDataList != null) {
topicRouteData.setQueueDatas(queueDataList);
foundQueueData = true;
//遍历MessageQueue列表
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
//拿到每个messagequeue对应的brokername
brokerNameSet.add(qd.getBrokerName());
}
for (String brokerName : brokerNameSet) {
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null != brokerData) {
//克隆一份brokerData
BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData
.getBrokerAddrs().clone());
brokerDataList.add(brokerDataClone);
foundBrokerData = true;
//遍历获取broker地址
for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
List<String> filterServerList = this.filterServerTable.get(brokerAddr);
filterServerMap.put(brokerAddr, filterServerList);
}
}
}
}
} finally {
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("pickupTopicRouteData Exception", e);
}
log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
if (foundBrokerData && foundQueueData) {
return topicRouteData;
}
return null;
}
2.4 小结
3. Producer
消息生产者的代码都在client模块中,相对于RocketMQ来讲,消息生产者就是客户端,也是消息的提供者。
3.1 方法和属性
3.1.1主要方法介绍
public interface MQAdmin {
/**
* 创建主题
*
* @param key accesskey
* @param newTopic topic name
* @param queueNum topic's queue number
*/
void createTopic(final String key, final String newTopic, final int queueNum)
throws MQClientException;
/**
* Creates an topic
*
* @param key accesskey
* @param newTopic topic name
* @param queueNum topic's queue number
* @param topicSysFlag topic system flag
*/
void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
throws MQClientException;
/**
* 根据时间戳从队列中查找消息偏移量
*
* @param mq Instance of MessageQueue
* @param timestamp from when in milliseconds.
* @return offset
*/
long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;
/**
*查找消息队列中最大的偏移量
*
* @param mq Instance of MessageQueue
* @return the max offset
*/
long maxOffset(final MessageQueue mq) throws MQClientException;
/**
* 查找消息队列中最小的偏移量
*
* @param mq Instance of MessageQueue
* @return the minimum offset
*/
long minOffset(final MessageQueue mq) throws MQClientException;
/**
* Gets the earliest stored message time
*
* @param mq Instance of MessageQueue
* @return the time in microseconds
*/
long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;
/**
* 根据偏移量查找消息
*
* @param offsetMsgId message id
* @return message
*/
MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
/**
* 根据条件查找消息
*
* @param topic message topic
* @param key message key index word
* @param maxNum max message number
* @param begin from when
* @param end to when
* @return Instance of QueryResult
*/
QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
final long end) throws MQClientException, InterruptedException;
/**
* @return The {@code MessageExt} of given msgId
* 根据消息ID和主题查找消息
*/
MessageExt viewMessage(String topic,
String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
}
3.1.2 属性介绍
producerGroup:生产者所属组
createTopicKey:默认Topic
defaultTopicQueueNums:默认主题在每一个Broker队列数量
sendMsgTimeout:发送消息默认超时时间,默认3s
compressMsgBodyOverHowmuch:消息体超过该值则启用压缩,默认4k
retryTimesWhenSendFailed:同步方式发送消息重试次数,默认为2,总共执行3次
retryTimesWhenSendAsyncFailed:异步方法发送消息重试次数,默认为2
retryAnotherBrokerWhenNotStoreOK:消息重试时选择另外一个Broker时,是否不等待存储结果就返回,默认为false
maxMessageSize:允许发送的最大消息长度,默认为4M
3.2 启动流程
代码:DefaultMQProducerImpl#start
//刚刚创建的
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
//检测配置
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
//将instanceName属性改为pid
this.defaultMQProducer.changeInstanceNameToPID();
}
//获取MQ客户端实例
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
//注册生产者 保证生产者唯一 //注册当前生产者到到MQClientInstance管理中,方便后续调用网路请求
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
//缓存主题的发布信息 {“TBW102”=> new TopicPublishInfo()}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
// 启动
mQClientFactory.start();
}
整个JVM中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表
ConcurrentMap
同一个clientId只会创建一个MQClientInstance。
MQClientInstance封装了RocketMQ网络处理API,是消息生产者和消息消费者与NameServer、Broker打交道的网络通道
代码:MQClientManager#getAndCreateMQClientInstance
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
//构建客户端ID
String clientId = clientConfig.buildMQClientId();
//通过客户端id获取MQClientInstance对象
MQClientInstance instance = this.factoryTable.get(clientId);
//没有获取到 创建新的
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
//获取旧的MQClientInstance
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
//保证当前客户端只有一个MQClientInstance对象
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
代码:org.apache.rocketmq.client.impl.factory.MQClientInstance#registerProducer
public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
if (null == group || null == producer) {
return false;
}
//如果原来有对应于该生产组名称的生产组。则注册失败,否则注册成功
MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
if (prev != null) {
log.warn("the producer group[{}] exist already.", group);
return false;
}
return true;
}
代码:org.apache.rocketmq.client.impl.factory.MQClientInstance#start
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
//获取NameServer地址,如果为空就获取 一般实例化的时候进行设置 如果没有 设置走此逻辑
if (null == this.clientConfig.getNamesrvAddr()) {
//通过http访问指定服务器 从jmenv.tbsite.net获取nameserver地址
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
3.3 消息发送
代码:DefaultMQProducerImpl#send(Message msg)
/**
* Send message in synchronous mode. This method returns only when the sending procedure totally completes.
*
* </p>
*同步方式发送消息,只有当发送消息过程完全结束,该方法才会返回
* <strong>Warn:</strong> this method has internal retry-mechanism, that is, internal implementation will retry
* {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may potentially
* delivered to broker(s). It's up to the application developers to resolve potential duplication issue.
*该方法包含重试机制,如果发送失败,重试{@link #retryTimesWhenSendAsyncFailed}次
* broker中可能会有重复的消息,需要应用开发者处理该问题
* @param msg Message to send.
* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
* {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.
* @throws MQClientException 客户端异常
* @throws RemotingException 网络层异常
* @throws MQBrokerException broker异常
* @throws InterruptedException 发送线程被打断异常
*/
@Override
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
Validators.checkMessage(msg, this);
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg);
}
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//指定要发送的消息 指定发送模式:单向oneway , sync, async
//当前方法中,调用该方法发送消息,不需要回调
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
3.3.1 验证消息
代码:Validators#checkMessage
/**
* Validate message
*/
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
throws MQClientException {
//判断是否为空
if (null == msg) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
}
// topic
Validators.checkTopic(msg.getTopic());
// body
if (null == msg.getBody()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
}
if (0 == msg.getBody().length) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
}
if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
}
}
3.3.2 查找路由
代码:DefaultMQProducerImpl#tryToFindTopicPublishInfo
//通过同步访问nameserver获取指定主题的路由信息 缓存到本地
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
//首先从本地缓存中获取
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
//如果本地缓存中没有
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
//在本地缓存中先占座,一会儿在填充
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
//从NameServer获取路由信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
//查询路由信息
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
//从NameServer获取路由信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
代码:TopicPublishInfo
public class TopicPublishInfo {
//是否是顺序主题
private boolean orderTopic = false;
//是否包含主题路由信息
private boolean haveTopicRouterInfo = false;
//MQ集合 空的
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
//存放各个线程应该发送消息到哪个MQ 每选择一次消息队列,该值+1
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
//关联Topic路由元信息
private TopicRouteData topicRouteData;
}
代码:MQClientInstance#updateTopicRouteInfoFromNameServer
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
//使用默认主题从NameServer获取路由信息
if (isDefault && defaultMQProducer != null) {
// TBW102
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
//设置写队列个数和读队列个数
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
////使用指定主题从NameServer获取路由信息
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
代码:MQClientInstance#updateTopicRouteInfoFromNameServer
//判断路由是否需要更改
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
代码:MQClientInstance#updateTopicRouteInfoFromNameServer
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// Update Pub info
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
//遍历生产者
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
//生产者不为空时,更新publishInfo信息
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
代码:MQClientInstance#topicRouteData2TopicPublishInfo
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
//创建TopicPublishInfo对象
TopicPublishInfo info = new TopicPublishInfo();
//关联topicRoute
info.setTopicRouteData(route);
//顺序消息,更新TopicPublishInfo
if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
String[] brokers = route.getOrderTopicConf().split(";");
for (String broker : brokers) {
String[] item = broker.split(":");
int nums = Integer.parseInt(item[1]);
for (int i = 0; i < nums; i++) {
MessageQueue mq = new MessageQueue(topic, item[0], i);
info.getMessageQueueList().add(mq);
}
}
info.setOrderTopic(true);
} else {
//非顺序消息更新TopicPublishInfo
List<QueueData> qds = route.getQueueDatas();
Collections.sort(qds);
//遍历topic队列信息
for (QueueData qd : qds) {
//是否是写队列
if (PermName.isWriteable(qd.getPerm())) {
BrokerData brokerData = null;
//遍历写队列Broker
for (BrokerData bd : route.getBrokerDatas()) {
//根据名称获得读队列对应的Broker
if (bd.getBrokerName().equals(qd.getBrokerName())) {
brokerData = bd;
break;
}
}
if (null == brokerData) {
continue;
}
if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
continue;
}
//封装TopicPublishInfo写队列
for (int i = 0; i < qd.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
info.getMessageQueueList().add(mq);
}
}
}
info.setOrderTopic(false);
}
//返回TopicPublishInfo对象
return info;
}
3.3.3 选择队列
- 默认不启用Broker故障延迟机制
代码:TopicPublishInfo#selectOneMessageQueue(lastBrokerName)
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
//第一次选择队列
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
//轮询
int index = this.sendWhichQueue.getAndIncrement();
//遍历消息队列集合
for (int i = 0; i < this.messageQueueList.size(); i++) {
//sendWhichQueue自增后取模
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
//规避上次Broker队列
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
//如果以上情况都不满足,返回sendWhichQueue取模后的队列
return selectOneMessageQueue();
}
}
代码:TopicPublishInfo#selectOneMessageQueue()
//第一次选择队列
public MessageQueue selectOneMessageQueue() {
//sendWhichQueue自增
int index = this.sendWhichQueue.getAndIncrement();
//对队列大小取模
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
//返回对应的队列
return this.messageQueueList.get(pos);
}
启用Broker故障延迟机制
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { //如果重试启动了延迟发送 if (this.sendLatencyFaultEnable) { try { //获取本次应该向哪个MQ发送消息,getAndIncrement表示每次该数字+1 int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { //index对写mq个数取模 int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); //验证队列是否可用 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { //可用 if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } //从规避的Broker中选择一个可用的Broker final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); //获得Broker的写队列集合 int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { //获得一个队列,指定broker和队列ID并返回 final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); }
延迟机制接口规范
- FaultItem:失败条目
如果向一个brokerName的broker发送消息失败,等待多长时间重试该broker
消息失败策略
原理分析
代码:DefaultMQProducerImpl#sendDefaultImpl
如果上述发送过程出现异常,则调用 DefaultMQProducerImpl#updateFaultItem
代码:MQFaultStrategy#updateFaultItem
代码:MQFaultStrategy#computeNotAvailableDuration
代码:LatencyFaultToleranceImpl#updateFaultItem
3.3.4 发送消息
消息发送API核心入口DefaultMQProducerImpl#sendKernelImpl
代码:DefaultMQProducerImpl#sendKernelImpl
代码:SendMessageHook
3.4 批量消息发送
批量消息发送是将同一个主题的多条消息一起打包发送到消息服务端,减少网络调用次数,提高网络传输效率。当然,并不是在同一批次中发送的消息数量越多越好,其判断依据是单条消息的长度,如果单条消息内容比较长,则打包多条消息发送会影响其他线程发送消息的响应时间,并且单批次消息总长度不能超过DefaultMQProducer#maxMessageSize。
批量消息发送要解决的问题是如何将这些消息编码以便服务端能够正确解码出每条消息的消息内容。
代码:DefaultMQProducer#send
代码:DefaultMQProducer#batch
encode()=> org.apache.rocketmq.common.message.MessageDecoder#encodeMessage
批量消息统一编码成一个消息 bytep[]
public static byte[] encodeMessage(Message message) {
//only need flag, body, properties
byte[] body = message.getBody();
int bodyLen = body.length;
String properties = messageProperties2String(message.getProperties());
byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
//note properties length must not more than Short.MAX
short propertiesLength = (short) propertiesBytes.length;
int sysFlag = message.getFlag();
int storeSize = 4 // 1 TOTALSIZE
+ 4 // 2 MAGICCOD
+ 4 // 3 BODYCRC
+ 4 // 4 FLAG
+ 4 + bodyLen // 4 BODY
+ 2 + propertiesLength;
ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);
// 1 TOTALSIZE
byteBuffer.putInt(storeSize);
// 2 MAGICCODE
byteBuffer.putInt(0);
// 3 BODYCRC
byteBuffer.putInt(0);
// 4 FLAG
int flag = message.getFlag();
byteBuffer.putInt(flag);
// 5 BODY
byteBuffer.putInt(bodyLen);
byteBuffer.put(body);
// 6 properties
byteBuffer.putShort(propertiesLength);
byteBuffer.put(propertiesBytes);
return byteBuffer.array();
}
4. 消息存储
4.1 消息存储核心类
4.2 消息存储流程
pageCache为CommitLog映射的内存文件
消息存储入口:DefaultMessageStore#putMessage
代码:CommitLog#putMessage
代码:appendMessage=> MappedFile#appendMessagesInner
直接写入内核空间中映射的MappedFile的内存区,由操作系统进行刷盘 零拷贝
代码:CommitLog#doAppend
代码:CommitLog#calMsgLength
代码:CommitLog#doAppend
代码:CommitLog#putMessage
4.3 存储文件
- commitLog:消息存储目录
- config:运行期间一些配置信息
- consumerqueue:消息消费队列存储目录
- index:消息索引文件存储目录
- abort:如果存在改文件寿命Broker非正常关闭
- checkpoint:文件检查点,存储CommitLog文件最后一次刷盘时间戳、consumerquueue最后一次刷盘时间,index索引文件最后一次刷盘时间戳。
4.4 存储文件内存映射
MMap
RocketMQ通过使用内存映射文件提高IO访问性能,无论是CommitLog、ConsumerQueue还是 IndexFile,单个文件都被设计为固定长度,如果一个文件写满以后再创建一个新文件,文件名就为该文件第一条消息对应的全局物理偏移量。
4.4.1 MappedFileQueue
String storePath; //存储目录
int mappedFileSize; // 单个文件大小
CopyOnWriteArrayList<MappedFile> mappedFiles; //MappedFile文件集合
AllocateMappedFileService allocateMappedFileService; //创建MapFile服务类
long flushedWhere = 0; //当前刷盘指针 将内存数据刷到硬盘
long committedWhere = 0; //当前数据提交指针,内存中ByteBuffer当前的写指针 将数据提交到Mapfile映射的内存,该值大于等于flushWhere
- 根据存储时间查询MappedFile
代码:MappedFileQueue#getMappedFileByTime
- 根据消息偏移量offset查找MappedFile
代码:MappedFileQueue#findMappedFileByOffset
- 获取存储文件最小偏移量
代码:MappedFileQueue#getMinOffset
- 获取存储文件最大偏移量
- 返回存储文件当前写指针
4.4.2 MappedFile
int OS_PAGE_SIZE = 1024 * 4; //操作系统每页大小,默认4K
AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0); //当前JVM实例中 MappedFile虚拟内存
AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0); //当前JVM实例中 MappedFile对象个数
AtomicInteger wrotePosition = new AtomicInteger(0); //当前文件的写指针
AtomicInteger committedPosition = new AtomicInteger(0); //当前文件的提交指针
AtomicInteger flushedPosition = new AtomicInteger(0); //刷写到磁盘指针
int fileSize; //文件大小
FileChannel fileChannel; //文件通道
ByteBuffer writeBuffer = null; //堆外内存ByteBuffer
TransientStorePool transientStorePool = null; //堆外内存池
String fileName; //文件名称
long fileFromOffset; //该文件的处理偏移量
File file; //物理文件
MappedByteBuffer mappedByteBuffer; //物理文件对应的内存映射Buffer
volatile long storeTimestamp = 0; //文件最后一次内容写入时间
boolean firstCreateInQueue = false; //是否是MappedFileQueue队列中第一个文件
- MappedFile初始化
未开启 transientStorePoolEnable 。 transientStorePoolEnable=true 为 true 表示数据先存储到堆外内存,然后通过 Commit 线程将数据提交到内存映射Buffer中,再通过 Flush线程将内存映射 Buffer 中数据持久化磁盘。
开启 transientStorePoolEnable
public void init(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize);
this.writeBuffer = transientStorePool.borrowBuffer(); //初始化 //堆外内存
writeBuffer this.transientStorePool = transientStorePool;
}
- MappedFile提交
提交数据到FileChannel,commitLeastPages为本次提交最小的页数,如果待提交数据不满 commitLeastPages,则不执行本次提交操作。如果writeBuffer如果为空,直接返回writePosition指针,无需执行commit操作,表名commit操作主体是writeBuffer。
MappedFile#isAbleToCommit
判断是否执行commit操作,如果文件已满返回true;如果commitLeastpages大于0,则比较 writePosition与上一次提交的指针commitPosition的差值,除以OS_PAGE_SIZE得到当前脏页的数量, 如果大于commitLeastPages则返回true,如果commitLeastpages小于0表示只要存在脏页就提交。
MappedFile#commit0
具体提交的实现,首先创建WriteBuffer区共享缓存区,然后将新创建的position回退到上一次提交的位置(commitPosition),设置limit为wrotePosition(当前最大有效数据指针),然后把 commitPosition到wrotePosition的数据写入到FileChannel中,然后更新committedPosition指针为 wrotePosition。commit的作用就是将MappedFile的writeBuffer中数据提交到文件通道FileChannel 中。
MappedFile#flush
刷写磁盘,直接调用MappedByteBuffer或fileChannel的force方法将内存中的数据持久化到磁盘,那么flushedPosition应该等于MappedByteBuffer中的写指针;如果writeBuffer不为空,则 flushPosition应该等于上一次的commit指针;因为上一次提交的数据就是进入到MappedByteBuffer 中的数据;如果writeBuffer为空,数据时直接进入到MappedByteBuffer,wrotePosition代表的是 MappedByteBuffer中的指针,故设置flushPosition为wrotePosition。
MappedFile#getReadPosition
获取当前文件最大可读指针。如果writeBuffer为空,则直接返回当前的写指针;如果writeBuffer 不为空,则返回上一次提交的指针。在MappedFile设置中,只有提交了的数据(写入到MappedByteBuffer或FileChannel中的数据)才是安全的数据
MappedFile#selectMappedBuffer
查找pos到当前最大可读之间的数据,由于在整个写入期间都未曾改MappedByteBuffer的指针, 如果mappedByteBuffer.slice()方法返回的共享缓存区空间为整个MappedFile,然后通过设置 ByteBuffer的position为待查找的值,读取字节长度当前可读最大长度,最终返回的ByteBuffer的limit 为size。整个共享缓存区的容量为(MappedFile#fileSize-pos)。故在操作 SelectMappedBufferResult不能对包含在里面的ByteBuffer调用filp方法。
MappedFile#shutdown
MappedFile文件销毁的实现方法为public boolean destory(long intervalForcibly), intervalForcibly表示拒绝被销毁的最大存活时间。
4.4.3 TransientStorePool
短暂的存储池。RocketMQ单独创建一个MappedByteBuffer内存缓存池(堆外内存),用来临时存储数据,数据先写入该内存映射中,然后由commit线程定时将数据从该内存复制到与目标物理文件对应的内存映射中。RocketMQ引入该机制主要的原因是提供一种内存锁定,将当前堆外内存一直锁定在内存中,避免被进程将内存交换到磁盘。
private final int poolSize; //availableBuffers个数
private final int fileSize; //每隔ByteBuffer大小
private final Deque<ByteBuffer> availableBuffers; //ByteBuffer容器。双端队列
初始化
public void init() {
//创建poolSize个堆外内存
for (int i = 0; i < poolSize; i++) {
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
final long address = ((DirectBuffer) byteBuffer).address();
Pointer pointer = new Pointer(address);
//使用com.sun.jna.Library类库将该批内存锁定,避免被置换到交换区,提高存储性能
LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
availableBuffers.offer(byteBuffer);
}
}
4.5 实时更新消息消费队列与索引文件
消息消费队文件、消息属性索引文件都是基于CommitLog文件构建的,当消息生产者提交的消息存储在CommitLog文件中,ConsumerQueue、IndexFile需要及时更新,否则消息无法及时被消费,根据消息属性查找消息也会出现较大延迟。RocketMQ通过开启一个线程ReputMessageService来准实时转发CommitLog文件更新事件,相应的任务处理器根据转发的消息及时更新ConsumerQueue、 IndexFile文件。
代码:DefaultMessageStore:start
//设置CommitLog内存中最大偏移量
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
//启动
this.reputMessageService.start();
代码:DefaultMessageStore:run
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");
//每隔1毫秒就继续尝试推送消息到消息消费队列和索引文件
while (!this.isStopped()) {
try {
Thread.sleep(1);
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
代码:DefaultMessageStore:doReput
//从result中循环遍历消息,一次读一条,创建DispatherRequest对象。
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByte Buffer(), false, false);
int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
DefaultMessageStore.this.doDispatch(dispatchRequest);
}
}
}
DispatchRequest
4.5.1 转发到ConsumerQueue
代码:DefaultMessageStore#putMessagePositionInfo
代码:DefaultMessageStore#putMessagePositionInfo
4.5.2 转发到Index
代码:DefaultMessageStore#buildIndex
4.6 消息队列和索引文件恢复
由于RocketMQ存储首先将消息全量存储在CommitLog文件中,然后异步生成转发任务更新ConsumerQueue和Index文件。如果消息成功存储到CommitLog文件中,转发任务未成功执行,此时消息服务器Broker由于某个愿意宕机,导致CommitLog、ConsumerQueue、IndexFile文件数据不一 致。如果不加以人工修复的话,会有一部分消息即便在CommitLog中文件中存在,但由于没有转发到 ConsumerQueue,这部分消息将永远无法被消费者消费。
4.6.1 存储文件加载
代码:DefaultMessageStore#load
判断上一次是否异常退出。实现机制是Broker在启动时创建abort文件,在退出时通过JVM钩子函 数删除abort文件。如果下次启动时存在abort文件。说明Broker时异常退出的,CommitLog与 ConsumerQueue数据有可能不一致,需要进行修复。
代码:DefaultMessageStore#load
代码:MappedFileQueue#load
加载CommitLog到映射文件
代码:DefaultMessageStore#loadConsumeQueue
加载消息消费队列
代码:IndexService#load
加载索引文件
代码:DefaultMessageStore#recover
文件恢复,根据Broker是否正常退出执行不同的恢复策略
代码:DefaultMessageStore#recoverTopicQueueTable
恢复ConsumerQueue后,将在CommitLog实例中保存每隔消息队列当前的存储逻辑偏移量,这也是消息中不仅存储主题、消息队列ID、还存储了消息队列的关键所在。
4.6.2 正常恢复
代码:CommitLog#recoverNormally
代码:MappedFileQueue#truncateDirtyFiles
4.6.3 异常恢复
Broker异常停止文件恢复的实现为CommitLog#recoverAbnormally。异常文件恢复步骤与正常停 止文件恢复流程基本相同,其主要差别有两个。首先,正常停止默认从倒数第三个文件开始进行恢复, 而异常停止则需要从最后一个文件往前走,找到第一个消息存储正常的文件。其次,如果CommitLog 目录没有消息文件,如果消息消费队列目录下存在文件,则需要销毁。
代码:CommitLog#recoverAbnormally
4.7 刷盘机制
RocketMQ的存储是基于JDK NIO的内存映射机制(MappedByteBuffer)的,消息存储首先将消息追加到内存,再根据配置的刷盘策略在不同时间进行刷写磁盘。
4.7.1 同步刷盘
消息追加到内存后,立即将数据刷写到磁盘文件
代码:CommitLog#handleDiskFlush
GroupCommitRequest
long nextOffset; //刷盘点偏移量
CountDownLatch countDownLatch = new CountDownLatch(1); //倒计树锁存器
volatile boolean flushOK = false; //刷盘结果;默认为false
代码:GroupCommitService#run
代码:GroupCommitService#doCommit
4.7.2 异步刷盘
在消息追加到内存后,立即返回给消息发送端。如果开启transientStorePoolEnable,RocketMQ 会单独申请一个与目标物理文件(commitLog)同样大小的堆外内存,该堆外内存将使用内存锁定,确 保不会被置换到虚拟内存中去,消息首先追加到堆外内存,然后提交到物理文件的内存映射中,然后刷 写到磁盘。如果未开启transientStorePoolEnable,消息直接追加到物理文件直接映射文件中,然后刷 写到磁盘中.
开启transientStorePoolEnable后异步刷盘步骤:
- 将消息直接追加到ByteBuffer(堆外内存)
- CommitRealTimeService线程每隔200ms将ByteBuffer新追加内容提交到 MappedByteBuffer(就是映射的物理文件)中
- MappedByteBuffer在内存中追加提交的内容,wrotePosition指针向后移动
- commit操作成功返回,将committedPosition位置恢复
- FlushRealTimeService线程默认每500ms将MappedByteBuffer中新追加的内存刷写到磁盘
代码:CommitLog$CommitRealTimeService#run
提交线程工作机制
代码:CommitLog$FlushRealTimeService#run
刷盘线程工作机制
4.8 过期文件删除机制
由于RocketMQ操作CommitLog、ConsumerQueue文件是基于内存映射机制并在启动的时候回加载CommitLog、ConsumerQueue目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以要引入一种机制来删除已过期的文件。RocketMQ顺序写CommitLog、 ConsumerQueue文件,所有写操作全部落在最后一个CommitLog或者ConsumerQueue文件上,之前 的文件在下一个文件创建后将不会再被更新。RocketMQ清除过期文件的方法时:如果当前文件在在一定时间间隔内没有再次被消费,则认为是过期文件,可以被删除,RocketMQ不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为72小时,通过在Broker配置文件中设置fileReservedTime来改变过期时间,单位为小时。
代码:DefaultMessageStore#addScheduleTask
代码:DefaultMessageStore#cleanFilesPeriodically
代码:DefaultMessageStore#deleteExpiredFiles
删除文件操作的条件
- 指定删除文件的时间点,RocketMQ通过deleteWhen设置一天的固定时间执行一次删除过期文件操作,默认凌晨4点
- 磁盘空间如果不充足,删除过期文件
- 预留,手工触发。
代码:CleanCommitLogService#isSpaceToDelete
当磁盘空间不足时执行删除过期文件
代码:MappedFileQueue#deleteExpiredFileByTime
执行文件销毁和删除
4.9 小结
RocketMQ的存储文件包括消息文件(Commitlog)、消息消费队列文件(ConsumerQueue)、 Hash索引文件(IndexFile)、监测点文件(checkPoint)、abort(关闭异常文件)。单个消息存储文件、消息消费队列文件、Hash索引文件长度固定以便使用内存映射机制进行文件的读写操作。 RocketMQ组织文件以文件的起始偏移量来命名文件,这样根据偏移量能快速定位到真实的物理文件。 RocketMQ基于内存映射文件机制提供了同步刷盘和异步刷盘两种机制,异步刷盘是指在消息存储时先追加到内存映射文件,然后启动专门的刷盘线程定时将内存中的文件数据刷写到磁盘。
CommitLog,消息存储文件,RocketMQ为了保证消息发送的高吞吐量,采用单一文件存储所有主题消息,保证消息存储是完全的顺序写,但这样给文件读取带来了不便,为此RocketMQ为了方便消息消费构建了消息消费队列文件,基于主题与队列进行组织,同时RocketMQ为消息实现了Hash索引,可以为消息设置索引键,根据所以能够快速从CommitLog文件中检索消息。
当消息达到CommitLog后,会通过ReputMessageService线程接近实时地将消息转发给消息消费队列文件与索引文件。为了安全起见,RocketMQ引入abort文件,记录Broker的停机是否是正常关闭还是异常关闭,在重启Broker时为了保证CommitLog文件,消息消费队列文件与Hash索引文件的正确性,分别采用不同策略来恢复文件。
RocketMQ不会永久存储消息文件、消息消费队列文件,而是启动文件过期机制并在磁盘空间不足或者默认凌晨4点删除过期文件,文件保存72小时并且在删除文件时并不会判断该消息文件上的消息是否被消费。
5. Consumer
5.1 消息消费概述
消息消费以组的模式开展,一个消费组内可以包含多个消费者,每一个消费者组可订阅多个主题, 消费组内消费者之间有集群模式和广播模式两种消费模式。
集群模式,主题下的同一条消息只允许被其中一个消费者消费。
广播模式,主题下的同一条消息,将被集群内的所有消费者消费一次。
消息服务器与消费者之间的消息传递也有两种模式:推模式、拉模式。
所谓的拉模式,是消费端主动拉起拉消息请求, 而推模式是消息达到消息服务器后,推送给消息消费者。
RocketMQ消息推模式的实现基于拉模式,在拉模式上包装一层,一个拉取任务完成后开始下一个拉取任务。
集群模式下,多个消费者如何对消息队列进行负载呢?
消息队列负载机制遵循一个通用思想:一个消息队列同一个时间只允许被一个消费者消费,一个消费者可以消费多个消息队列。
RocketMQ支持局部顺序消息消费,也就是保证同一个消息队列上的消息顺序消费。
不支持消息全局顺序消费,如果要实现某一个主题的全局顺序消费,可以将该主题的队列数设置为 1,牺牲高可用性。
5.2 消息消费初探
消息推送模式
消息消费重要方法
void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName):发送消息确认
Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) :获取消费者对主题分配了那些消息队列
void registerMessageListener(final MessageListenerConcurrently messageListener):注册并发事件监听器
void registerMessageListener(final MessageListenerOrderly messageListener): 注册顺序消息事件监听器
void subscribe(final String topic, final String subExpression):基于主题订阅消息,消息过滤使用表达式
void subscribe(final String topic, final String fullClassName,final String filterClassSource):基于主题订阅消息,消息过滤使用类模式
void subscribe(final String topic, final MessageSelector selector) :订阅消息, 并指定队列选择器
void unsubscribe(final String topic):取消消息订阅
DefaultMQPushConsumer
//消费者组
private String consumerGroup;
//消息消费模式
private MessageModel messageModel = MessageModel.CLUSTERING;
//指定消费开始偏移量(最大偏移量、最小偏移量、启动时间戳)开始消费
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
//集群模式下的消息队列负载策略
private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
//订阅信息
private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
//消息业务监听器
private MessageListener messageListener;
//消息消费进度存储器
private OffsetStore offsetStore;
//消费者最小线程数量
private int consumeThreadMin = 20;
//消费者最大线程数量
private int consumeThreadMax = 20;
//并发消息消费时处理队列最大跨度
private int consumeConcurrentlyMaxSpan = 2000;
//每1000次流控后打印流控日志
private int pullThresholdForQueue = 1000;
//推模式下任务间隔时间
private long pullInterval = 0;
//推模式下任务拉取的条数,默认32条
private int pullBatchSize = 32;
//每次传入MessageListener#consumerMessage中消息的数量
private int consumeMessageBatchMaxSize = 1;
//是否每次拉取消息都订阅消息
private boolean postSubscriptionWhenPull = false;
//消息重试次数,-1代表16次
private int maxReconsumeTimes = -1;
//消息消费超时时间
private long consumeTimeout = 15;
5.3 消费者启动流程
代码:DefaultMQPushConsumerImpl#start
5.4 消息拉取
消息消费模式有两种模式:广播模式与集群模式。广播模式比较简单,每一个消费者需要拉取订阅主题下所有队列的消息。本文重点讲解集群模式。在集群模式下,同一个消费者组内有多个消息消费者,同一个主题存在多个消费队列,消费者通过负载均衡的方式消费消息。
消息队列负载均衡,通常的作法是一个消息队列在同一个时间只允许被一个消费消费者消费,一个消息消费者可以同时消费多个消息队列。
5.4.1 PullMessageService实现机制
从MQClientInstance的启动流程中可以看出,RocketMQ使用一个单独的线程PullMessageService来负责消息的拉取。
代码:PullMessageService#run
private String consumerGroup; //消费者组
private MessageQueue messageQueue; //待拉取消息队列
private ProcessQueue processQueue; //消息处理队列
private long nextOffset; //待拉取的MessageQueue偏移量
private boolean lockedFirst = false; //是否被锁定
代码:PullMessageService#pullMessage
5.4.2 ProcessQueue实现机制
ProcessQueue是MessageQueue在消费端的重现、快照。PullMessageService从消息服务器默认每次拉取32条消息,按照消息的队列偏移量顺序存放在ProcessQueue中,PullMessageService然后将消息提交到消费者消费线程池,消息成功消费后从ProcessQueue中移除。
方法:
5.4.3 消息拉取基本流程
5.4.3.1 客户端发起拉取请求
代码:DefaultMQPushConsumerImpl#pullMessage
5.4.3.2.消息服务端Broker组装消息
代码:PullMessageProcessor#processRequest
代码:DefaultMessageStore#getMessage
代码:PullMessageProcessor#processRequest
5.4.3.3.消息拉取客户端处理消息
代码:MQClientAPIImpl#processPullResponse
PullResult类
代码:DefaultMQPushConsumerImpl$PullCallback#OnSuccess
5.4.3.4.消息拉取总结
5.4.4 消息拉取长轮询机制分析
RocketMQ未真正实现消息推模式,而是消费者主动向消息服务器拉取消息,RocketMQ推模式是循环向消息服务端发起消息拉取请求,如果消息消费者向RocketMQ拉取消息时,消息未到达消费队列时,如果不启用长轮询机制,则会在服务端等待shortPollingTimeMills时间后(挂起)再去判断消息是否已经到达指定消息队列,如果消息仍未到达则提示拉取消息客户端PULL—NOT—FOUND(消息不存在);如果开启长轮询模式,RocketMQ一方面会每隔5s轮询检查一次消息是否可达,同时一有消息达到后立马通知挂起线程再次验证消息是否是自己感兴趣的消息,如果是则从CommitLog文件中提取消息返回给消息拉取客户端,否则直到挂起超时,超时时间由消息拉取方在消息拉取是封装在请求参数中,PUSH模式为15s,PULL模式通过DefaultMQPullConsumer#setBrokerSuspendMaxTimeMillis设置。RocketMQ通过在Broker客户端配置longPollingEnable为true来开启长轮询模式。
代码:PullMessageProcessor#processRequest
PullRequestHoldService方式实现长轮询
代码:PullRequestHoldService#suspendPullRequest
代码:PullRequestHoldService#run
代码:PullRequestHoldService#checkHoldRequest
代码:PullRequestHoldService#notifyMessageArriving
如果开启了长轮询机制,PullRequestHoldService会每隔5s被唤醒去尝试检测是否有新的消息的到来才给客户端响应,或者直到超时才给客户端进行响应,消息实时性比较差,为了避免这种情况,RocketMQ引入另外一种机制:当消息到达时唤醒挂起线程触发一次检查。
DefaultMessageStore$ReputMessageService机制
代码:DefaultMessageStore#start
//长轮询入口
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
this.reputMessageService.start();
代码:DefaultMessageStore$ReputMessageService#run
代码:DefaultMessageStore$ReputMessageService#deReput
代码:NotifyMessageArrivingListener#arriving
5.5 消息队列负载与重新分布机制
RocketMQ消息队列重新分配是由RebalanceService线程来实现。一个MQClientInstance持有一个
RebalanceService实现,并随着MQClientInstance的启动而启动。
代码:RebalanceService#run
代码:MQClientInstance#doRebalance
代码:RebalanceImpl#rebalanceByTopic
RocketMQ默认提供5中负载均衡分配算法
注意:消息队列的分配遵循一个消费者可以分配到多个队列,但同一个消息队列只会分配给一个消费者,故如果出现消费者个数大于消息队列数量,则有些消费者无法消费消息。
5.6 消息消费过程
PullMessageService负责对消息队列进行消息拉取,从远端服务器拉取消息后将消息存储ProcessQueue消息队列处理队列中,然后调用ConsumeMessageService#submitConsumeRequest方法进行消息消费,使用线程池来消费消息,确保了消息拉取与消息消费的解耦。ConsumeMessageService支持顺序消息和并发消息,核心类图如下:
并发消息消费
代码:ConsumeMessageConcurrentlyService#submitConsumeRequest
代码:ConsumeMessageConcurrentlyService$ConsumeRequest#run
5.7 定时消息机制
定时消息是消息发送到Broker后,并不立即被消费者消费而是要等到特定的时间后才能被消费,RocketMQ并不支持任意的时间精度,如果要支持任意时间精度定时调度,不可避免地需要在Broker层做消息排序,再加上持久化方面的考量,将不可避免的带来巨大的性能消耗,所以RocketMQ只支持特定级别的延迟消息。消息延迟级别在Broker端通过messageDelayLevel配置,默认为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,delayLevel=1表示延迟消息1s,delayLevel=2表示延迟5s,依次类推。
RocketMQ定时消息实现类为ScheduleMessageService,该类在DefaultMessageStore中创建。
通过在DefaultMessageStore中调用load方法加载该类并调用start方法启动。
代码:ScheduleMessageService#load
代码:ScheduleMessageService#start
调度机制
ScheduleMessageService的start方法启动后,会为每一个延迟级别创建一个调度任务,每一个延迟级别对应SCHEDULETOPIC_XXXX主题下的一个消息消费队列。定时调度任务的实现类为DeliverDelayedMessageTimerTask,核心实现方法为executeOnTimeup
代码:**_ScheduleMessageService$DeliverDelayedMessageTimerTask#executeOnTimeup**
5.8 顺序消息
顺序消息实现类是
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService
代码:ConsumeMessageOrderlyService#start
代码:ConsumeMessageOrderlyService#submitConsumeRequest
代码:ConsumeMessageOrderlyService$ConsumeRequest#run
5.9 小结
RocketMQ消息消费方式分别为集群模式、广播模式。
消息队列负载由RebalanceService线程默认每隔20s进行一次消息队列负载,根据当前消费者组内消费者个数与主题队列数量按照某一种负载算法进行队列分配,分配原则为同一个消费者可以分配多个消息消费队列,同一个消息消费队列同一个时间只会分配给一个消费者。
消息拉取由PullMessageService线程根据RebalanceService线程创建的拉取任务进行拉取,默认每次拉取32条消息,提交给消费者消费线程后继续下一次消息拉取。如果消息消费过慢产生消息堆积会触发消息消费拉取流控。
并发消息消费指消费线程池中的线程可以并发对同一个消息队列的消息进行消费,消费成功后,取出消息队列中最小的消息偏移量作为消息消费进度偏移量存储在于消息消费进度存储文件中,集群模式消息消费进度存储在Broker(消息服务器),广播模式消息消费进度存储在消费者端。
RocketMQ不支持任意精度的定时调度消息,只支持自定义的消息延迟级别,例如1s、2s、5s等,可通过在broker配置文件中设置messageDelayLevel。
顺序消息一般使用集群模式,是指对消息消费者内的线程池中的线程对消息消费队列只能串行消费。并并发消息消费最本质的区别是消息消费时必须成功锁定消息消费队列,在Broker端会存储消息消费队列的锁占用情况。