您現在的位置是:首頁 > 網頁遊戲首頁網頁遊戲

訊息通訊,追求極致效能

  • 由 生活在小樹旁的俊逸 發表于 網頁遊戲
  • 2022-12-18
簡介2 ~ 4 步驟實際上是 Producer、Broker 以及NameServer 之間整個進行資料通訊的過程,面對複雜的訊息佇列系統,一個性能優良,穩定性高的網路通訊模組是非常重要的,它體現了RocketMQ叢集訊息的整體吞吐和負載能力

語言感嘆號怎麼輸出

作者:

翁智華

出處:

https://www。cnblogs。com/wzh2010/

1 介紹

前面的章節我學習了 NameServer的原理,訊息的生產傳送,以及訊息的消費的全過程。

我們來回顧一下:

RocketMQ 訊息佇列架構主要包括NameServe、Broker(Master/Slave)、Producer、Consumer 4個核心部件,基本執行流程如下:

訊息通訊,追求極致效能

NameServer 優先啟動。NameServer 是整個 RocketMQ 的“中央大腦”

,作為 RocketMQ 的服務註冊中心,所以 RocketMQ 需要先啟動 NameServer 再啟動 Rocket 中的 Broker。

Broker 啟動後,需要將自己註冊至NameServer中,並 保持長連線,

每 30s 傳送一次傳送心跳包,來確保Broker是否存活

。並將 Broker 資訊 ( IP+、埠等資訊)以及Broker中儲存的Topic資訊上報。註冊成功後,NameServer 叢集中就有 Topic 跟 Broker 的對映關係。

NameServer 如果檢測到Broker 宕機(

因為使用心跳機制, 如果檢測超120s(兩分鐘)無響應

),則從路由登錄檔中將其移除。

生產者在傳送某個主題的訊息之前先從 NamerServer 獲取 Broker 伺服器地址列表(Broker可能是Cluster模式),然後根據負載均衡演算法從列表中選擇1臺Broker ,建立連線通道,進行訊息傳送。

消費者在訂閱某個topic的訊息之前從 NamerServer 獲取 Broker 伺服器地址列表(Broker可能是Cluster模式),包括關聯的全部Topic佇列資訊。進而獲取當前訂閱 Topic 存在哪些 Broker 上,然後直接跟 Broker 建立連線通道,開始消費資料。

生產者和消費者預設每

30s

從 NamerServer 獲取 Broker 伺服器地址列表,以及關聯的所有Topic佇列資訊,更新到Client本地。

2 ~ 4 步驟實際上是 Producer、Broker 以及NameServer 之間整個進行資料通訊的過程,面對複雜的訊息佇列系統,一個性能優良,穩定性高的網路通訊模組是非常重要的,它體現了RocketMQ叢集訊息的整體吞吐和負載能力。也是RocketMQ保證高效能、高穩定性的基石。

2 網路通訊過程分析

2。1 通訊類(rocketmq-remoting )的結構解析

訊息通訊,追求極致效能

透過上圖可以看到,在整個RocketMQ佇列系統中,rocketmq-remoting 這個module是專門用來負責網路通訊職能的。

並且從模組依賴關係中可以看出 ,rocketmq-client(client)、rocketmq-broker(broker)、rocketmq-namesrv(namesrc 命名服務) 等模組均依賴了它。

訊息通訊,追求極致效能

通訊層是基於 Netty 進行擴充套件的,並自定義了通訊協議,用於將訊息傳遞給 Broker 進行儲存。實現Client與Server之間高效的資料請求與接收。

2。2 協議結構設計

因為是基於Netty進行擴充套件的,所以自定義了RocketMQ的訊息協議,在傳輸過程的資料進行結構制定、封裝、編解碼的過程。

在RocketMQ中,負責這個工作的就是RemotingCommand類,我們來看看這個類的幾個重要屬性:

訊息通訊,追求極致效能

2。3 訊息內容的組成結構

傳輸的訊息內容主要由一下幾個部分組成:

訊息通訊,追求極致效能

訊息通訊,追求極致效能

2。4 RocketMQ 訊息通訊流程

在RocketMQ訊息佇列中支援通訊的模式主要有

sync 同步傳送模式

async 非同步傳送模式

oneway 單向模式,無需關注Response

2。4。1 通訊流程說明

下圖從 NettyRemotingClient 初始化,NettyRemotingServer 初始化,基於 NettyRemotingClient 的訊息傳送,以及Handler 處理過程來說明。

訊息通訊,追求極致效能

Broker 和 NameServer 啟動時同步呼叫 NettyRemotingServer。start() 方法, 初始化 Netty 伺服器

配置 BossGroup/WorkerGroup NioEventLoopGroup 執行緒組

配置 Channel

新增 NettyServerHandler

呼叫 serverBootstrap。bind() 監聽埠,等待client的connection

Producer 和 Consumer 同樣需要啟動 Netty 的客戶端,透過呼叫NettyRemotingClient。start() 初始化 Netty 客戶端

配置客戶端 NioEventLoopGroup 執行緒組

配置 Channel

新增 NettyClientHandler

傳送同步訊息時,呼叫 NettyRemoteClient。invokeSync(),從 channelTables 快取中獲取或者建立用於通訊的 Channel 通道。

建立完 Channel 後,生產者 Producer 呼叫 Channel。writeAndFlush() 傳送資料

NettyRemotingServer 服務端執行緒組 處理可讀事件,呼叫 NettyServerHandler 處理資料。

下一步,NettyServerHandler 呼叫 processMessageReceived方法,接收並處理傳送過來的資料。

根據請求碼 RequestCode 區別不同的請求,來執行不同的 Processor。

說明:Processor 在服務端初始化的時候,將 RequestCode 新增到 Processor 快取中。訊息的存、查、拉取都是不同的請求碼。

processMessageReceived 從ResponseTables(key 為 opaque) 快取中取出 ResponseFuture,並將將返回結果設定到 ResponseFuture。同步模式下執行 responseFuture。putResponse()方法,非同步呼叫執行回撥方法。

NettyRemotingClient 收到可讀事件,呼叫 NettyClientHandler 讀取並處理返回事件。

2。4。2 Reactor多執行緒設計

上面我們說過了,RocketMQ的通訊是採用Netty元件作為底層通訊庫。同樣的,它也遵循Reactor多執行緒模型,並在此基礎上做了一些最佳化。

訊息通訊,追求極致效能

上面圖中四個圖形可以大致說明NettyRemotingServer的Reactor 多執行緒模型,在RocketMQ中的存在形式。

M:1個 Reactor 主執行緒:eventLoopGroupBoss,它的職能是負責監聽 TCP網路連線請求,有連線請求過來時候,建立SocketChannel,並註冊到selector上。

S:RocketMQ的原始碼中會選擇NIO或Epoll,來監聽網路資料,當監聽到網路資料過來時,讀取資料並丟給Worker執行緒池:eventLoopGroupSelector,Rocket原始碼中預設設定執行緒數為3。

M1:執行業務之前的各種雜事(SSL認證、空閒檢查、網路連線檢查、編解碼、序列化反序列化 等等),交付給 這些工作交給defaultEventExecutorGroup 去處理,RocketMQ原始碼中預設執行緒數設定為8。

M2:剩下處理業務的操作,就直接放在業務執行緒池中執行了。按照之前說的,依據RequestCode去processorTable 本地快取中找到對應的 processor,並封裝成task任務,在丟給對應的業務processor執行緒池來處理。

訊息通訊,追求極致效能

完整的可以參照官網的這張圖:

訊息通訊,追求極致效能

總結

上面介紹了 RocketMQ 訊息通訊的主要內容,我們用幾句話總結下:

整個RocketMQ佇列系統中,rocketmq-remoting Module是專門用來負責網路通訊職能的。

網路通訊模組基於Netty進行擴充套件的,所以自定義了RocketMQ的訊息協議,在傳輸過程的資料進行結構制定、封裝、編解碼的過程。

理解 NettyRemotingServer/NettyRemotingClient 的初始化過程,以及呼叫 NettyServerHandler/NettyClienthandler 進行處理的執行流程。

同步非同步:同步和非同步消核心區別是 同步訊息透過 Netty 傳送請求後會執行 ResponseFuture。waitResponse() 阻塞等待,非同步的請求則 SendCallback 相應的方法進行回撥處理。

多執行緒模式下會透過1個Reactor 主執行緒(監聽連線),以及Reactor 執行緒池(監聽資料)、Worker 執行緒池(處理前置工作)、Processor執行緒池(處理業務邏輯) 來處理通訊過程。

Top