kafka 的消費類 KafkaConsumer 是非線程安全的,因此用戶無法在多線程中共享一個 KafkaConsumer 實例,且 KafkaConsumer 本身并沒有實現多線程消費邏輯,如需多線程消費,還需要用戶自行實現,在這里我會講到 Kafka 兩種多線程消費模型。
1、每個線程維護一個 KafkaConsumer。
這樣相當于一個進程內擁有多個消費者,也可以說消費組內成員是由多個線程內的 KafkaConsumer 組成的。
但其實這個消費模型是存在很大問題的,從消費消費模型可看出每個 KafkaConsumer 會負責固定的分區,因此無法提升單個分區的消費能力,如果一個主題分區數量很多,只能通過增加 KafkaConsumer 實例提高消費能力,這樣一來線程數量過多,導致項目 Socket 連接開銷巨大,項目中一般不用該線程模型去消費。
2、單 KafkaConsumer 實例 + 多 worker 線程。
針對第一個線程模型的缺點,我們可采取 KafkaConsumer 實例與消息消費邏輯解耦,把消息消費邏輯放入單獨的線程中去處理,線程模型如下:
從消費線程模型可看出,當 KafkaConsumer 實例與消息消費邏輯解耦后,我們不需要創建多個 KafkaConsumer 實例就可進行多線程消費,還可根據消費的負載情況動態調整 worker 線程,具有很強的獨立擴展性,在公司內部使用的多線程消費模型就是用的單 KafkaConsumer 實例 + 多 worker 線程模型。
但這個消費模型由于消費邏輯是利用多線程進行消費的,因此并不能保證其消息的消費順序,在這里我們可以引入阻塞隊列的模型,一個 woker 線程對應一個阻塞隊列,線程不斷輪訓從阻塞隊列中獲取消息進行消費,對具有相同 key 的消息進行取模,并放入相同的隊列中,實現順序消費, 消費模型如下:
但是以上兩個消費線程模型,存在一個問題:
在消費過程中,如果 Kafka 消費組發生重平衡,此時的分區被分配給其它消費組了,如果拉取回來的消息沒有被消費,雖然 Kakfa 可以實現 ConsumerRebalanceListener 接口,在新一輪重平衡前主動提交消費偏移量,但這貌似解決不了未消費的消息被打亂順序的可能性?
因此在消費前,還需要主動進行判斷此分區是否被分配給其它消費者處理,并且還需要鎖定該分區在消費當中不能被分配到其它消費者中(但 kafka 目前做不到這一點)。
特別聲明:以上內容(如有圖片或視頻亦包括在內)為自媒體平臺“網易號”用戶上傳并發布,本平臺僅提供信息存儲服務。
Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.