CFSDN nhấn mạnh vào giá trị tạo ra nguồn mở và chúng tôi cam kết xây dựng nền tảng chia sẻ tài nguyên để mọi nhân viên CNTT có thể tìm thấy thế giới tuyệt vời của bạn tại đây.
Bài viết trên blog CFSDN này về vấn đề tích lũy tin nhắn do cùng một RocketMQ ClientID gây ra đã được tác giả sưu tầm và biên soạn. Nếu các bạn quan tâm đến bài viết này thì nhớ like nhé.

Trước hết, BUG RocketMQ gây ra sự cố này đã được khắc phục chính thức trong bản gửi này vào ngày 16 tháng 3. Ở đây chúng tôi chỉ thảo luận về các chi tiết cụ thể gây ra sự cố trước khi khắc phục. Để biết thêm ngữ cảnh, vui lòng tham khảo "Thời gian khởi động của người tiêu dùng RocketMQ. " Tôi đã viết trước Bạn đã làm gì?" Bài viết này giải thích những hoạt động mà Người tiêu dùng của RocketMQ đã thực hiện sau khi nó được bắt đầu. Sẽ rất hữu ích nếu hiểu được BUG được giải thích lần này.
Nó nói về:

Tin nhắn chồng chất.
Không cần phải nói, sử dụng nhiều lần thì ClientID của bạn vẫn như vậy. Bài viết này tập trung vào lý do tại sao tin nhắn tích lũy.
Như đã đề cập trong bài viết, khi khởi tạo Consumer, chiến lược Rebalance sẽ được khởi tạo. Bạn có thể hiểu đại khái chiến lược Tái cân bằng là một chiến lược phân bổ m MessageQueue trong một Chủ đề cho n phiên bản Người tiêu dùng trong Nhóm Người tiêu dùng. Nó có vẻ hơi phức tạp, nhưng thực tế nó trông như thế này:

chiến lược tái cân bằng.
Như có thể thấy từ mã nguồn khởi tạo của Người tiêu dùng, chiến lược Tái cân bằng được Người tiêu dùng áp dụng theo mặc định là AllocateMessageQueueAverage().

Chiến lược tái cân bằng mặc định.
Chiến lược mặc định rất dễ hiểu, phân bổ MessageQueue cho người tiêu dùng một cách đồng đều. Ví dụ: giả sử có 8 MessageQueue và 2 Consumer thì mỗi Consumer sẽ được gán cho 4 MessageQueue.
Điều gì sẽ xảy ra nếu phân phối không đồng đều? Ví dụ: chỉ có 7 MessageQueue nhưng vẫn có 2 Consumer. Tại thời điểm này, RocketMQ sẽ phân phối đồng đều các phần bổ sung cho Người tiêu dùng đã được sắp xếp và phân phối từng phần cho Người tiêu dùng cho đến khi quá trình phân phối hoàn tất. Ví dụ: trong trường hợp 7 MessageQueue và 2 ConsumerGroup vừa đề cập, Consumer đầu tiên sẽ được gán cho 4 MessageQueue, và Consumer thứ hai sẽ được gán cho 3 MessageQueue.
Trước tiên, bạn có thể hiểu cách triển khai AllocateMessageQueueAveragely. Là chiến lược Tái cân bằng mặc định, việc triển khai chiến lược này nằm ở đây:

Nơi chính sách mặc định được thực hiện.
Tiếp theo, chúng ta hãy xem AllocateMessageQueueAveragely thực hiện những gì trong nội bộ.
Cốt lõi của nó thực sự là phương thức phân bổ trong giao diện AllocateMessageQueueStrargety đã triển khai. Trên thực tế, RocketMQ có tổng cộng 5 cách triển khai giao diện này:
- Phân bổMáyPhòngGần đó
- Phân bổMessageQueueTrung bình
- Phân bổMessageQueueTrung bìnhTheoCircle
- Phân bổMessageQueueByConfig
- Phân bổ hàng đợi tin nhắn theo phòng máy
- Phân bổMessageQueueConsistentHash
AllocateMessageQueueAveragely mặc định của nó chỉ là một trong những triển khai của nó. Vậy cần có những tham số nào để thực thi phân bổ?

Đi vào.
Bốn điều sau đây là bắt buộc:
- Nhóm người tiêu dùng Tên của nhóm người tiêu dùng
- currentCID clientID của người tiêu dùng hiện tại
- mqAll Tất cả MessageQueue trong Chủ đề được Nhóm Người tiêu dùng hiện tại sử dụng
- cidAll ClientID của tất cả người tiêu dùng trong Nhóm người tiêu dùng hiện tại
Trên thực tế, tất cả MessageQueue trong một Chủ đề nhất định đều được gán cho tất cả các phiên bản người tiêu dùng thuộc về cùng một người tiêu dùng và mức độ chi tiết là Theo Chủ đề.
Vì vậy, những việc còn lại ở đây rất đơn giản, không gì khác hơn là làm thế nào để gán bó MessageQueue này cho nhóm Người tiêu dùng này. Còn điều này thì sao, tương ứng với các cách triển khai khác nhau của AllocateMessageQueueStrategy.
Tiếp theo chúng ta cùng xem AllocateMessageQueueAveragely phân bổ MessageQueue như thế nào khi nói về mã nguồn trước đây, tôi thường đi từng bước một, ghép mã nguồn với hình ảnh, tuy nhiên mã nguồn này quá ngắn nên tôi sẽ chỉ đưa ra thôi. Đầu tiên.
- công cộng Danh sách
allocate(String consumerGroup, String currentCID, List
mqAll, Danh sách
cidTất cả) {
- nếu (currentCID == vô giá trị || currentCID.length() < 1) {
- ném ra ngoại lệ IllegalArgumentException mới"currentCID đang trống");
- }
- nếu (mqAll == vô giá trị || mqAll.isEmpty()) {
- ném ra ngoại lệ IllegalArgumentException mới"mqAll là null hoặc mqAll trống");
- }
- nếu (cidAll == vô giá trị || cidAll.isEmpty()) {
- ném ra ngoại lệ IllegalArgumentException mới"cidAll là null hoặc cidAll trống");
- }
-
- Danh sách
kết quả = new ArrayList
();
-
- // Xác định xem client hiện tại có nằm trong tập hợp cidAll không
- nếu (!cidAll.chứa(currentCID)) {
- log.thông tin("[LỖI] ConsumerGroup: {} consumerId: {} không có trong cidAll: {}",
- Nhóm người tiêu dùng,
- ID hiện tại,
- cidTất cả);
- trở lại kết quả;
- }
-
- // Lấy vị trí của người tiêu dùng hiện tại trong mảng tất cả các phiên bản của người tiêu dùng
- số nguyên chỉ số = cidAll.indexOf(currentCID);
- // Sử dụng số lượng MessageQueue để lấy phần còn lại của số phiên bản tiêu dùng. Điều này thực sự tính toán số lượng MessageQueue không được chia đều.
- // Ví dụ: 12 MessageQueue, 5 Người tiêu dùng, 12 % 5 = 2
- số nguyên chế độ = mqAll.kích cỡ() % cidTất cả.kích cỡ();
- số nguyên Kích thước trung bình =
- mqAll.kích cỡ() <= cidTất cả.kích cỡ() ? 1 : (mod > 0 && chỉ số < mod ? mqTất cả.kích cỡ() / cidTất cả.kích cỡ() + 1 : mqAll.kích cỡ() / cidTất cả.kích cỡ());
- số nguyên startIndex = (mod > 0 && chỉ số < sửa đổi) ? chỉ số * Kích thước trung bình: chỉ số * kích thước trung bình + mod;
- số nguyên phạm vi = Toán học.phút(Kích thước trung bình, mqAll.kích cỡ() - bắt đầu chỉ mục);
- vì (số nguyên i = 0; i < phạm vi; i++) {
- kết quả.thêm vào(mqAll.get((startIndex + i) % mqAll.kích cỡ()));
- }
- trở lại kết quả;
- }
Trên thực tế, nửa đầu chỉ là kiểm tra thông thường nên bạn có thể bỏ qua. Từ đây:
- số nguyên chỉ số = cidAll.indexOf(currentCID);
Sự khởi đầu là logic cốt lõi. Để tránh nhầm lẫn về mặt logic, giả định rằng có 12 MessageQueue, 5 Người tiêu dùng và chỉ mục = 0.
Khi đó giá trị của mod là 12 % 5 = 2.
Giá trị của AverageSize hơi khó hiểu một chút. Nếu số MessageQueue nhỏ hơn số lượng người tiêu dùng thì nó là 1; nếu không, hãy sử dụng nhóm logic này (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll. size() / cidAll.size()). Chỉ số của chúng tôi là 0 và mod là 2. Chỉ số < mod là đúng, khi đó giá trị Kích thước trung bình cuối cùng là 12/5 + 1 = 3.
Tiếp theo là startIndex. Vì điều kiện của toán tử bậc ba này là đúng nên giá trị của nó là 0 * 3, tức là 0.
Bạn có thấy choáng váng sau khi đọc nhiều logic không?
12 hàng đợi tin nhắn.
5 trường hợp người tiêu dùng.
Theo cách chia trên:
Người tiêu dùng xếp hạng 1 được chỉ định 3.
Người tiêu dùng xếp thứ 2 được xếp thứ 3.
Người tiêu dùng xếp thứ 3 được cấp 2 phần.
Người tiêu dùng xếp thứ 4 được phân bổ 2.
Người tiêu dùng xếp thứ 5 được phân bổ 2.

Quy trình phân bổ cụ thể.
Vì vậy, bạn có thể nghĩ đại khái là:
Đầu tiên "chia đều" và làm tròn 12/5 thành 2. Sau đó, sau khi "phân phối đồng đều" hoàn thành, vẫn còn 2 người tiêu dùng và sau đó họ được phân phối lại lần lượt từ trên xuống dưới, để người tiêu dùng thứ nhất và thứ hai sẽ được phân bổ thêm một người tiêu dùng.
Vì vậy, nếu có 13 MessageQueue và 5 Consumer thì 3 sẽ được phân bổ cho thứ 1, thứ 2 và thứ 3.
Nhưng nó không chính xác, vì MessageQueue được phân bổ là một lần. Ví dụ: ba MessageQueue được lấy cùng một lúc và sẽ không được trao cho hai cái đầu tiên và sau đó là một.
Điều gì sẽ xảy ra nếu ClientID của Người tiêu dùng mà chúng tôi đã đề cập ở phần đầu giống nhau?
Tất nhiên, giá trị của chỉ mục là như nhau, dẫn đến mod, AverageSize, startIndex và range đều giống nhau. Sau đó, cuối cùng result.add(mqAll.get((startIndex + i) % mqAll.size()));, những Người tiêu dùng khác nhau sẽ nhận được cùng một MessageQueue (ví dụ: Người tiêu dùng 1 và Người tiêu dùng 2 đều nhận được 3 MessageQueue đầu tiên ), dẫn đến một số MessageQueue (nếu có) không có Consumer Khi nó được tiêu thụ nhưng không được tiêu thụ, tin nhắn liên tục được gửi đi, điều này sẽ gây ra sự tích tụ tin nhắn lớn.
Tất nhiên, phiên bản mới hiện tại đã khắc phục vấn đề này theo quan điểm mã. Đây chỉ là phần khám phá nguyên nhân của phiên bản trước.
Liên kết gốc: https://mp.weixin.qq.com/s/W6gepFrrsvWXZNVO-sP7QA.
Cuối cùng, bài viết về vấn đề tích lũy tin nhắn do cùng một RocketMQ ClientID gây ra sẽ kết thúc tại đây. Nếu bạn muốn biết thêm về vấn đề tích lũy tin nhắn do cùng một RocketMQ ClientID gây ra, vui lòng tìm kiếm các bài viết của CFSDN hoặc tiếp tục duyệt các bài viết liên quan. Tôi hy vọng bạn sẽ ủng hộ blog của tôi trong tương lai! .
Tôi là một lập trình viên xuất sắc, rất giỏi!