源碼分析RocketMQ消息軌跡

目錄

本文沿着的思路,從如下3個方面對其源碼進行解讀:

  1. 發送消息軌跡
  2. 消息軌跡格式
  3. 存儲消息軌跡數據

@(本節目錄)

1、發送消息軌跡流程

首先我們來看一下在消息發送端如何啟用消息軌跡,示例代碼如下:

public class TraceProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);      // @1
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int i = 0; i < 10; i++)
            try {
                {
                    Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }
        producer.shutdown();
    }
}

從上述代碼可以看出其關鍵點是在創建DefaultMQProducer時指定開啟消息軌跡跟蹤。我們不妨瀏覽一下DefaultMQProducer與啟用消息軌跡相關的構造函數:

public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace)
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic)

參數如下:

  • String producerGroup
    生產者所屬組名。
  • boolean enableMsgTrace
    是否開啟跟蹤消息軌跡,默認為false。
  • String customizedTraceTopic
    如果開啟消息軌跡跟蹤,用來存儲消息軌跡數據所屬的主題名稱,默認為:RMQ_SYS_TRACE_TOPIC。

1.1 DefaultMQProducer構造函數

public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) {      // @1
    this.producerGroup = producerGroup;
    defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
    //if client open the message trace feature
    if (enableMsgTrace) {                                                                                                                                                                                            // @2
        try {
            AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);                                                         
            dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
            traceDispatcher = dispatcher;
            this.getDefaultMQProducerImpl().registerSendMessageHook(
                new SendMessageTraceHookImpl(traceDispatcher));                                                                                                                             // @3
        } catch (Throwable e) {
            log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
        }
    }
}

代碼@1:首先介紹一下其局部變量。

  • String producerGroup
    生產者所屬組。
  • RPCHook rpcHook
    生產者發送鈎子函數。
  • boolean enableMsgTrace
    是否開啟消息軌跡跟蹤。
  • String customizedTraceTopic
    定製用於存儲消息軌跡的數據。

代碼@2:用來構建AsyncTraceDispatcher,看其名:異步轉發消息軌跡數據,稍後重點關注。

代碼@3:構建SendMessageTraceHookImpl對象,並使用AsyncTraceDispatcher用來異步轉發。

1.2 SendMessageTraceHookImpl鈎子函數

1.2.1 SendMessageTraceHookImpl類圖

  1. SendMessageHook
    消息發送鈎子函數,用於在消息發送之前、發送之後執行一定的業務邏輯,是記錄消息軌跡的最佳擴展點。
  2. TraceDispatcher
    消息軌跡轉發處理器,其默認實現類AsyncTraceDispatcher,異步實現消息軌跡數據的發送。下面對其屬性做一個簡單的介紹:
    • int queueSize
      異步轉發,隊列長度,默認為2048,當前版本不能修改。
    • int batchSize
      批量消息條數,消息軌跡一次消息發送請求包含的數據條數,默認為100,當前版本不能修改。
    • int maxMsgSize
      消息軌跡一次發送的最大消息大小,默認為128K,當前版本不能修改。
    • DefaultMQProducer traceProducer
      用來發送消息軌跡的消息發送者。
    • ThreadPoolExecutor traceExecuter
      線程池,用來異步執行消息發送。
    • AtomicLong discardCount
      記錄丟棄的消息個數。
    • Thread worker
      woker線程,主要負責從追加隊列中獲取一批待發送的消息軌跡數據,提交到線程池中執行。
    • ArrayBlockingQueue< TraceContext> traceContextQueue
      消息軌跡TraceContext隊列,用來存放待發送到服務端的消息。
    • ArrayBlockingQueue< Runnable> appenderQueue
      線程池內部隊列,默認長度1024。
    • DefaultMQPushConsumerImpl hostConsumer
      消費者信息,記錄消息消費時的軌跡信息。
    • String traceTopicName
      用於跟蹤消息軌跡的topic名稱。

1.2.2 源碼分析SendMessageTraceHookImpl

1.2.2.1 sendMessageBefore
public void sendMessageBefore(SendMessageContext context) { 
    //if it is message trace data,then it doesn't recorded
    if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {   // @1
        return;
    }
    //build the context content of TuxeTraceContext
    TraceContext tuxeContext = new TraceContext();
    tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
    context.setMqTraceContext(tuxeContext);
    tuxeContext.setTraceType(TraceType.Pub);
    tuxeContext.setGroupName(context.getProducerGroup());                                                                                                                       // @2
    //build the data bean object of message trace
    TraceBean traceBean = new TraceBean();                                                                                                                                                // @3
    traceBean.setTopic(context.getMessage().getTopic());
    traceBean.setTags(context.getMessage().getTags());
    traceBean.setKeys(context.getMessage().getKeys());
    traceBean.setStoreHost(context.getBrokerAddr());
    traceBean.setBodyLength(context.getMessage().getBody().length);
    traceBean.setMsgType(context.getMsgType());
    tuxeContext.getTraceBeans().add(traceBean);
}

代碼@1:如果topic主題為消息軌跡的Topic,直接返回。

代碼@2:在消息發送上下文中,設置用來跟蹤消息軌跡的上下環境,裏面主要包含一個TraceBean集合、追蹤類型(TraceType.Pub)與生產者所屬的組。

代碼@3:構建一條跟蹤消息,用TraceBean來表示,記錄原消息的topic、tags、keys、發送到broker地址、消息體長度等消息。

從上文看出,sendMessageBefore主要的用途就是在消息發送的時候,先準備一部分消息跟蹤日誌,存儲在發送上下文環境中,此時並不會發送消息軌跡數據。

1.2.2.2 sendMessageAfter
public void sendMessageAfter(SendMessageContext context) {
    //if it is message trace data,then it doesn't recorded
    if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())     // @1
        || context.getMqTraceContext() == null) {
        return;
    }
    if (context.getSendResult() == null) {
        return;
    }

    if (context.getSendResult().getRegionId() == null
        || !context.getSendResult().isTraceOn()) {
        // if switch is false,skip it
        return;
    }

    TraceContext tuxeContext = (TraceContext) context.getMqTraceContext();
    TraceBean traceBean = tuxeContext.getTraceBeans().get(0);                                                                                                // @2
    int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());     // @3
    tuxeContext.setCostTime(costTime);                                                                                                                                      // @4
    if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {                                                                    
        tuxeContext.setSuccess(true);
    } else {
        tuxeContext.setSuccess(false);
    }
    tuxeContext.setRegionId(context.getSendResult().getRegionId());                                                                                      
    traceBean.setMsgId(context.getSendResult().getMsgId());
    traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
    traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
    localDispatcher.append(tuxeContext);                                                                                                                                   // @5
}

代碼@1:如果topic主題為消息軌跡的Topic,直接返回。

代碼@2:從MqTraceContext中獲取跟蹤的TraceBean,雖然設計成List結構體,但在消息發送場景,這裏的數據永遠只有一條,及時是批量發送也不例外。

代碼@3:獲取消息發送到收到響應結果的耗時。

代碼@4:設置costTime(耗時)、success(是否發送成功)、regionId(發送到broker所在的分區)、msgId(消息ID,全局唯一)、offsetMsgId(消息物理偏移量,如果是批量消息,則是最後一條消息的物理偏移量)、storeTime,這裏使用的是(客戶端發送時間 + 二分之一的耗時)來表示消息的存儲時間,這裡是一個估值。

代碼@5:將需要跟蹤的信息通過TraceDispatcher轉發到Broker服務器。其代碼如下:

public boolean append(final Object ctx) {
    boolean result = traceContextQueue.offer((TraceContext) ctx);
    if (!result) {
        log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx);
    }
    return result;
}

這裏一個非常關鍵的點是offer方法的使用,當隊列無法容納新的元素時會立即返回false,並不會阻塞。

接下來將目光轉向TraceDispatcher的實現。

1.3 TraceDispatcher實現原理

TraceDispatcher,用於客戶端消息軌跡數據轉發到Broker,其默認實現類:AsyncTraceDispatcher。

1.3.1 TraceDispatcher構造函數

public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException {    
    // queueSize is greater than or equal to the n power of 2 of value
    this.queueSize = 2048;
    this.batchSize = 100;
    this.maxMsgSize = 128000;                                        
    this.discardCount = new AtomicLong(0L);         
    this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
    this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
    if (!UtilAll.isBlank(traceTopicName)) {
        this.traceTopicName = traceTopicName;
    } else {
        this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
    }                   // @1
    this.traceExecuter = new ThreadPoolExecutor(// :
        10, //
        20, //
        1000 * 60, //
        TimeUnit.MILLISECONDS, //
        this.appenderQueue, //
        new ThreadFactoryImpl("MQTraceSendThread_"));
    traceProducer = getAndCreateTraceProducer(rpcHook);      // @2
}

代碼@1:初始化核心屬性,該版本這些值都是“固化”的,用戶無法修改。

  • queueSize
    隊列長度,默認為2048,異步線程池能夠積壓的消息軌跡數量。
  • batchSize
    一次向Broker批量發送的消息條數,默認為100.
  • maxMsgSize
    向Broker彙報消息軌跡時,消息體的總大小不能超過該值,默認為128k。
  • discardCount
    整個運行過程中,丟棄的消息軌跡數據,這裏要說明一點的是,如果消息TPS發送過大,異步轉發線程處理不過來時,會主動丟棄消息軌跡數據。
  • traceContextQueue
    traceContext積壓隊列,客戶端(消息發送、消息消費者)在收到處理結果后,將消息軌跡提交到噶隊列中,則會立即返回。
  • appenderQueue
    提交到Broker線程池中隊列。
  • traceTopicName
    用於接收消息軌跡的Topic,默認為RMQ_SYS_TRANS_HALF_TOPIC。
  • traceExecuter
    用於發送到Broker服務的異步線程池,核心線程數默認為10,最大線程池為20,隊列堆積長度2048,線程名稱:MQTraceSendThread_。、
  • traceProducer
    發送消息軌跡的Producer。

代碼@2:調用getAndCreateTraceProducer方法創建用於發送消息軌跡的Producer(消息發送者),下面詳細介紹一下其實現。

1.3.2 getAndCreateTraceProducer詳解

private DefaultMQProducer getAndCreateTraceProducer(RPCHook rpcHook) {
        DefaultMQProducer traceProducerInstance = this.traceProducer;
        if (traceProducerInstance == null) {  //@1
            traceProducerInstance = new DefaultMQProducer(rpcHook);
            traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME);
            traceProducerInstance.setSendMsgTimeout(5000);
            traceProducerInstance.setVipChannelEnabled(false);
            // The max size of message is 128K
            traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000);
        }
        return traceProducerInstance;
    }

代碼@1:如果還未建立發送者,則創建用於發送消息軌跡的消息發送者,其GroupName為:_INNER_TRACE_PRODUCER,消息發送超時時間5s,最大允許發送消息大小118K。

1.3.3 start

public void start(String nameSrvAddr) throws MQClientException {
    if (isStarted.compareAndSet(false, true)) {     // @1
        traceProducer.setNamesrvAddr(nameSrvAddr);
        traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
        traceProducer.start();
    }
    this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);   // @2
    this.worker.setDaemon(true);
    this.worker.start();                                                                                   
    this.registerShutDownHook();
}

開始啟動,其調用的時機為啟動DefaultMQProducer時,如果啟用跟蹤消息軌跡,則調用之。

代碼@1:如果用於發送消息軌跡的發送者沒有啟動,則設置nameserver地址,並啟動着。

代碼@2:啟動一個線程,用於執行AsyncRunnable任務,接下來將重點介紹。

1.3.4 AsyncRunnable

class AsyncRunnable implements Runnable {
         private boolean stopped;
    public void run() {
        while (!stopped) {
            List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);     // @1
            for (int i = 0; i < batchSize; i++) {
                TraceContext context = null;
                try {
                    //get trace data element from blocking Queue — traceContextQueue
                    context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);        // @2
                } catch (InterruptedException e) {
                }
                if (context != null) {
                    contexts.add(context);
                } else {
                    break;
                }
            }
            if (contexts.size() > 0) {                                                                               :
                AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);  // @3
                traceExecuter.submit(request);                                                               
            } else if (AsyncTraceDispatcher.this.stopped) {
                this.stopped = true;
            }
        }
    }
}

代碼@1:構建待提交消息跟蹤Bean,每次最多發送batchSize,默認為100條。

代碼@2:從traceContextQueue中取出一個待提交的TraceContext,設置超時時間為5s,即如何該隊列中沒有待提交的TraceContext,則最多等待5s。

代碼@3:向線程池中提交任務AsyncAppenderRequest。

1.3.5 AsyncAppenderRequest#sendTraceData

public void sendTraceData(List<TraceContext> contextList) {
    Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>();
    for (TraceContext context : contextList) {        //@1
        if (context.getTraceBeans().isEmpty()) {
            continue;
        }
        // Topic value corresponding to original message entity content
        String topic = context.getTraceBeans().get(0).getTopic();     // @2
        // Use  original message entity's topic as key
        String key = topic;
        List<TraceTransferBean> transBeanList = transBeanMap.get(key);
        if (transBeanList == null) {
            transBeanList = new ArrayList<TraceTransferBean>();
            transBeanMap.put(key, transBeanList);
        }
        TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context);    // @3
        transBeanList.add(traceData);
    }
    for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) {       // @4
        flushData(entry.getValue());
    }
}

代碼@1:遍歷收集的消息軌跡數據。

代碼@2:獲取存儲消息軌跡的Topic。

代碼@3:對TraceContext進行編碼,這裡是消息軌跡的傳輸數據,稍後對其詳細看一下,了解其上傳的格式。

代碼@4:將編碼后的數據發送到Broker服務器。

1.3.6 TraceDataEncoder#encoderFromContextBean

根據消息軌跡跟蹤類型,其格式會有一些不一樣,下面分別來介紹其合適。

1.3.6.1 PUB(消息發送)
case Pub: {
    TraceBean bean = ctx.getTraceBeans().get(0);
    //append the content of context and traceBean to transferBean's TransData
    sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getTopic()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getTags()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getBodyLength()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getOffsetMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
     .append(ctx.isSuccess()).append(TraceConstants.FIELD_SPLITOR);
}

消息軌跡數據的協議使用字符串拼接,字段的分隔符號為1,整個數據以2結尾,感覺這個設計還是有點“不可思議”,為什麼不直接使用json協議呢?

1.3.6.2 SubBefore(消息消費之前)
for (TraceBean bean : ctx.getTraceBeans()) {
    sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getRetryTimes()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getKeys()).append(TraceConstants.FIELD_SPLITOR);//
    }
}

軌跡就是按照上述順序拼接而成,各個字段使用1分隔,每一條記錄使用2結尾。

1.3.2.3 SubAfter(消息消費后)
case SubAfter: {
    for (TraceBean bean : ctx.getTraceBeans()) {
        sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(ctx.getContextCode()).append(TraceConstants.FIELD_SPLITOR);
        }
    }
}

格式編碼一樣,就不重複多說。

經過上面的源碼跟蹤,消息發送端的消息軌跡跟蹤流程、消息軌跡數據編碼協議就清晰了,接下來我們使用一張序列圖來結束本部分的講解。

其實行文至此,只關注了消息發送的消息軌跡跟蹤,消息消費的軌跡跟蹤又是如何呢?其實現原理其實是一樣的,就是在消息消費前後執行特定的鈎子函數,其實現類為ConsumeMessageTraceHookImpl,由於其實現與消息發送的思路類似,故就不詳細介紹了。

2、 消息軌跡數據如何存儲

其實從上面的分析,我們已經得知,RocketMQ的消息軌跡數據存儲在到Broker上,那消息軌跡的主題名如何指定?其路由信息又怎麼分配才好呢?是每台Broker上都創建還是只在其中某台上創建呢?RocketMQ支持系統默認與自定義消息軌跡的主題。

2.1 使用系統默認的主題名稱

RocketMQ默認的消息軌跡主題為:RMQ_SYS_TRACE_TOPIC,那該Topic需要手工創建嗎?其路由信息呢?

{
    if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) {    // @1
        String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName();
        TopicConfig topicConfig = new TopicConfig(topic);
        this.systemTopicList.add(topic);
        topicConfig.setReadQueueNums(1);                                              // @2
        topicConfig.setWriteQueueNums(1);
        this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
    }
}

上述代碼出自TopicConfigManager的構造函數,在Broker啟動的時候會創建topicConfigManager對象,用來管理topic的路由信息。

代碼@1:如果Broker開啟了消息軌跡跟蹤(traceTopicEnable=true)時,會自動創建默認消息軌跡的topic路由信息,注意其讀寫隊列數為1。

2.2 用戶自定義消息軌跡主題

在創建消息發送者、消息消費者時,可以显示的指定消息軌跡的Topic,例如:

public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic)

public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
        AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic)

通過customizedTraceTopic來指定消息軌跡Topic。

溫馨提示:通常在生產環境上,將不會開啟自動創建主題,故需要RocketMQ運維管理人員提前創建好Topic。

好了,本文就介紹到這裏了,本文詳細介紹了RocktMQ消息軌跡的實現原理,下一篇,我們將進入到多副本的學習中。

作者介紹:
丁威,《RocketMQ技術內幕》作者,RocketMQ 社區佈道師,公眾號: 維護者,目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※想知道網站建置網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計後台網頁設計

※不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※帶您來看台北網站建置台北網頁設計,各種案例分享

您可能也會喜歡…