在Broker中,事务消息的初始化是通过BrokerController.initialTransaction()
方法执行的。
privatevoidinitialTransaction(){this.transactionalMessageService=ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID,TransactionalMessageService.class);if(null==this.transactionalMessageService){this.transactionalMessageService=newTransactionalMessageServiceImpl(newTransactionalMessageBridge(this,this.getMessageStore()));LOG.warn("Loaddefaulttransactionmessagehookservice:{}",TransactionalMessageServiceImpl.class.getSimpleName());}this.transactionalMessageCheckListener=ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID,AbstractTransactionalMessageCheckListener.class);if(null==this.transactionalMessageCheckListener){this.transactionalMessageCheckListener=newDefaultTransactionalMessageCheckListener();LOG.warn("Loaddefaultdiscardmessagehookservice:{}",DefaultTransactionalMessageCheckListener.class.getSimpleName());}this.transactionalMessageCheckListener.setBrokerController(this);this.transactionalMessageCheckService=newTransactionalMessageCheckService(this);}
这里有三个核心的初始化变量
TransactionalMessageService
事务消息主要处理服务。默认实现类是TransactionalMessageServiceImpl
也可以自己定义事务消息处理实现类,通过ServiceProvider.loadClass()
方法进行加载。TransactionalMessageService
类定义如下。内部属性已加注释标明。
publicinterfaceTransactionalMessageService{//用于保存Half事务消息PutMessageResultprepareMessage(MessageExtBrokerInnermessageInner);CompletableFuture<PutMessageResult>asyncPrepareMessage(MessageExtBrokerInnermessageInner);//删除事务消息booleandeletePrepareMessage(MessageExtmessageExt);//提交事务消息OperationResultcommitMessage(EndTransactionRequestHeaderrequestHeader);//回滚事务消息OperationResultrollbackMessage(EndTransactionRequestHeaderrequestHeader);voidcheck(longtransactionTimeout,inttransactionCheckMax,AbstractTransactionalMessageCheckListenerlistener);//打开事务消息booleanopen();//关闭事务消息voidclose();}
transactionalMessageCheckListener
事务消息回查监听器
transactionalMessageCheckService
事务消息回查服务,启动一个线程定时检查超时的Half消息是否需要回查。
处理事务消息
当初始化完成之后,Broker就可以处理事务消息了。
Broker存储事务消息的是org.apache.rocketmq.broker.processor.SendMessageProcessor
,这和普通消息其实是一样的。 但是有两点针对事务消息的特殊处理:
第一处:
在org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage
中:
//获取扩展字段的值,若是该值为true则为事务消息StringtraFlag=oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);booleansendTransactionPrepareMessage=false;if(Boolean.parseBoolean(traFlag)&&!(msgInner.getReconsumeTimes()>0&&msgInner.getDelayTimeLevel()>0)){//判断当前Broker配置是否支持事务消息if(this.brokerController.getBrokerConfig().isRejectTransactionMessage()){response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("thebroker["+this.brokerController.getBrokerConfig().getBrokerIP1()+"]sendingtransactionmessageisforbidden");returnresponse;}sendTransactionPrepareMessage=true;}
if(sendTransactionPrepareMessage){//保存Half信息putMessageResult=this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);}else{putMessageResult=this.brokerController.getMessageStore().putMessage(msgInner);}
第二处:
存储事务消息前的预处理,对应方法是org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner
privateMessageExtBrokerInnerparseHalfMessageInner(MessageExtBrokerInnermsgInner){//将原消息的topic保存在扩展字段中MessageAccessor.putProperty(msgInner,MessageConst.PROPERTY_REAL_TOPIC,msgInner.getTopic());//将原消息的QueueId保存在扩展字段中MessageAccessor.putProperty(msgInner,MessageConst.PROPERTY_REAL_QUEUE_ID,String.valueOf(msgInner.getQueueId()));//将原消息的SysFlag保存在扩展字段中msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(),MessageSysFlag.TRANSACTION_NOT_TYPE));//修改topic的值为RMQ_SYS_TRANS_HALF_TOPICmsgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());//修改Queueid为0msgInner.setQueueId(0);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));returnmsgInner;}
完成上述步骤之后,调用DefaultMessageStole.putMessage()
方法将其保存到CommitLog
中。
CommitLog存储成功之后,通过org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend()
方法对其进行处理。
finalinttranType=MessageSysFlag.getTransactionValue(msgInner.getSysFlag());switch(tranType){//PreparedandRollbackmessageisnotconsumed,willnotentertheconsumequeuecaseMessageSysFlag.TRANSACTION_PREPARED_TYPE:caseMessageSysFlag.TRANSACTION_ROLLBACK_TYPE:queueOffset=0L;break;caseMessageSysFlag.TRANSACTION_NOT_TYPE:caseMessageSysFlag.TRANSACTION_COMMIT_TYPE:default:break;}
这里的逻辑是这样的,当读到的消息类型为事务消息时,设置当前消息的位点值为0,而不是设置真实的位点。这样该位点就不会建立ConsumeQueue索引,也不会被消费。