- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我们正在通过 CDC 将数据从 DB2 (table-1) 发送到 Kafka 主题 (topic-1)。我们需要在 DB2 数据和 Kafka 主题之间进行协调。我们有两个选择 -
a) 将所有 kafka 主题数据放入 DB2(作为 table-1-copy),然后进行左外连接(在 table-1 和 table-1-copy 之间)以查看不匹配的记录,创建增量并将其推回卡夫卡。câu hỏi:可扩展性——我们的数据集大约有十亿条记录,我不确定 DB2 DBA 是否会让我们运行如此庞大的连接操作(可能很容易持续超过 15-20 分钟)。
b) 再次将 DB2 推回并行的 kafka 主题(topic-1-copy),然后执行一些基于 kafka 流的解决方案以在 kafka topic-1 和 topic-1-copy 之间进行左外连接。我仍然在思考 kafka 流和左外连接。我不确定(在 kafka 流中使用窗口系统)我是否能够将 topic-1 的全部内容与 topic-1-copy 进行比较。
更糟糕的是,kafka中的topic-1是一个紧凑的topic,因此,当我们将数据从 DB2 推回 Kafka topic-1-copy 时,我们无法确定地启动 kafka topic-compaction 周期以确保 topic-1 和 topic-1-copy 在运行任何类型的比较之前都已完全压缩对他们进行操作。
c) 有没有其他我们可以考虑的框架选项?
理想的解决方案必须针对任何规模的数据进行扩展。
1 Câu trả lời
我看不出您不能在 Kafka Streams 或 KSQL 中执行此操作的原因。两者都支持表-表连接。这是假设支持数据格式。
键压缩不会影响结果,因为 Streams 和 KSQL 都会构建连接两个表的正确最终状态。如果压缩已经运行,需要处理的数据量可能会更少,但结果是一样的。
例如,在 ksqlDB 中,您可以将两个主题作为表导入并执行连接,然后通过 topic-1
表为 vô giá trị
进行过滤以查找缺失的列表行。
-- example using 0.9 ksqlDB, assuming a INT primary key:
-- create table from main topic:
CREATE TABLE_1
(ROWKEY INT PRIMARY KEY, )
WITH (kafka_topic='topic-1', value_format='?');
-- create table from second topic:
CREATE TABLE_2
(ROWKEY INT PRIMARY KEY, )
WITH (kafka_topic='topic-1-copy', value_format='?');
-- create a table containing only the missing keys:
CREATE MISSING AS
SELECT T2.* FROM TABLE_2 T2 LEFT JOIN TABLE_1 T1
WHERE T1.ROWKEY = null;
这种方法的好处是缺失行的 MISSING
表会自动更新:当您从源 DB2 实例中提取缺失的行并将它们生成到 topic-1
那么“MISSING”表中的行将被删除,即您会看到为 MISSING
主题生成的墓碑。
您甚至可以扩展此方法以查找不再存在于源数据库中的 topic-1
中的行:
-- using the same DDL statements for TABLE_1 and TABLE_2 from above
-- perform the join:
CREATE JOINED AS
SELECT * FROM TABLE_2 T2 FULL OUTER JOIN TABLE_1 T1;
-- detect rows in the DB that aren't in the topic:
CREATE MISSING AS
SELECT * FROM JOINED
WHERE T1_ROWKEY = null;
-- detect rows in the topic that aren't in the DB:
CREATE EXTRA AS
SELECT * FROM JOINED
WHERE T2_ROWKEY = null;
当然,您需要相应地调整集群的大小。 ksqlDB 集群越大,处理数据的速度就越快。它还需要磁盘容量来实现表。
您可以根据主题上的分区数设置的最大并行化量。如果您只有 1 个分区,则数据将按顺序处理。如果运行 100 个分区,则可以使用 100 个 CPU 内核处理数据,前提是您运行了足够多的 ksqlDB 实例。 (默认情况下,每个 ksqlDB 节点将为每个查询创建 4 个流处理线程,(尽管如果服务器有更多内核,您可以增加它!))。
关于database - 如何比较两个kafka流或数据库表之间的(10亿条记录)数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52129604/
Tôi gặp lỗi sau khi khởi động Kafka-Server trên máy Windows của mình. Tôi đã tải xuống Scala 2.11 - kafka_2.11-2.1.0.tgz từ liên kết sau: https://kafka.ap
Về hàng đợi tin nhắn Apache-Kafka. Tôi đã tải xuống Apache Kafka từ trang tải xuống Kafka. Tôi đã giải nén nó vào /opt/apache/installed/kafka
Giả sử tôi có chủ đề về ô tô của Kafka. Tôi cũng có một nhóm người tiêu dùng mang tên cars-consumers đăng ký theo dõi chủ đề về ô tô. Nhóm người tiêu dùng ô tô hiện đang ở mức bù trừ 89. Khi tôi bây giờ xóa xe
Tôi muốn biết phương pháp nào tốt nhất cho tôi: Kafka stream hay Kafka consumer api hay Kafka connect? Tôi muốn đọc dữ liệu từ một chủ đề, sau đó xử lý và ghi vào cơ sở dữ liệu. Vì vậy, tôi đã viết người tiêu dùng, nhưng tôi nghĩ tôi có thể viết phản hồi luồng Kafka
Tôi đã làm việc trên một số ứng dụng luồng Kafka và ứng dụng người dùng Kafka. Cuối cùng, luồng Kafka không gì khác hơn là người tiêu dùng sử dụng các sự kiện thời gian thực từ Kafka. Vì vậy, tôi không thể hiểu khi nào nên sử dụng luồng Kafka hoặc tại sao chúng ta nên sử dụng
Sự khác biệt giữa Kafka Acknowledgement và Kafka consumer commitSync() là gì? Cả hai đều được sử dụng để quản lý offset thủ công và muốn cả hai hoạt động đồng bộ. Xin hãy hỗ trợ câu trả lời tốt nhất bằng spring-kafka
Làm thế nào để ủy quyền các yêu cầu của nhà sản xuất Apache Kafka trên các nhà môi giới Kafka và chuyển hướng đến một cụm Kafka riêng biệt? Trong trường hợp cụ thể của tôi, không thể cập nhật ứng dụng khách ghi vào cụm này. Điều này có nghĩa là không thể: Cập nhật máy khách
Tôi cần đặt tên cho người dùng của mình trong Kafka 10 giống như tôi đã làm trong Kafka 8 vì tôi có các tập lệnh đánh hơi và sử dụng thông tin này thêm nữa. Rõ ràng, tên mặc định của consumer.id đã thay đổi (và hiện cũng được hiển thị riêng biệt
1. Tổng quan Chúng ta có thể thấy có một nút /log_dir_event_notification/ trong dữ liệu zk. Đây là một nút duy trì số sê-ri. Vai trò của nút này trong Kafka là: Khi LogDir trên Broker xuất hiện
Tôi đang sử dụng lệnh sau: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test.topic --property
Tôi đang gặp khó khăn trong việc hiểu một số khái niệm về Kafka trong Java Spring Boot. Tôi muốn kiểm tra người dùng với một nhà môi giới Kafka thực sự chạy trên máy chủ có một số nhà sản xuất đang viết/đã viết dữ liệu cho nhiều chủ đề khác nhau. Tôi muốn phục vụ
Kịch bản của tôi là tôi đang sử dụng nhiều chủ đề Kafka với tiền tố chung (ví dụ house.door, house.room) và sử dụng API mẫu chủ đề biểu thức chính quy của luồng Kafka để sử dụng tất cả các chủ đề. Mọi thứ trông ổn cả, tôi hiểu rồi
Có cách nào để lấy phiên bản cụm Kafka theo phương pháp lập trình không? Ví dụ: sử dụng API AdminClient. Tôi muốn xác định phiên bản Kafka Cluster trong ứng dụng người tiêu dùng/nhà sản xuất. Câu trả lời tốt nhất không thể được tìm thấy tại thời điểm này
Bất cứ khi nào tôi thử khởi động lại Kafka thì nó lại trả về lỗi sau. Khi tôi xóa /tmp/kafka-logs, vấn đề sẽ được giải quyết, nhưng nó cũng xóa chủ đề của tôi. Có cách nào để khắc phục lỗi này không? LỖI Lỗi trong khi
Tôi là người dùng mới của Apache Kafka và tôi vẫn đang tìm hiểu về các chức năng bên trong. Trong trường hợp sử dụng của tôi, tôi cần tăng số lượng phân vùng của một chủ đề từ máy khách Kafka Producer một cách linh hoạt. Tôi tìm thấy những câu hỏi tương tự khác về việc tăng kích thước phân vùng
Theo tài liệu của Kafka, một cách để thực hiện việc này là thông qua kafka.tools.MirrorMaker. Tuy nhiên, tôi cần sao chép một chủ đề (ví dụ như thử nghiệm với 1 phân vùng) (nội dung và siêu dữ liệu của nó) từ sản xuất sang máy chủ mà không cần kết nối
Tôi đã cấu hình 3 kafka trong một cụm và đang cố gắng sử dụng nó với spring-kafka. Nhưng sau khi giết thủ lĩnh Kafka, tôi không thể gửi thêm tin nhắn nào vào hàng đợi nữa. Tôi sẽ spring.kafka.bo
Trình kết nối Kafka Sink của tôi đọc từ nhiều chủ đề (10 tác vụ được cấu hình) và xử lý 300 bản ghi từ tất cả các chủ đề. Tùy thuộc vào thông tin được lưu giữ trong mỗi bản ghi, trình kết nối có thể thực hiện một số hành động nhất định. Sau đây là ví dụ về cặp khóa-giá trị trong bản ghi kích hoạt:
Tôi có mã luồng kafka sau public class KafkaStreamHandler implements Processor{ private ProcessorConte
Khi ứng dụng kafka-streams đang chạy và Kafka đột nhiên ngừng hoạt động, ứng dụng sẽ chuyển sang chế độ "chờ", gửi nhật ký cảnh báo về luồng người dùng và luồng nhà sản xuất không thể kết nối và khi Kafka hoạt động trở lại, mọi thứ (về mặt lý thuyết) sẽ trở lại bình thường.
Tôi là một lập trình viên xuất sắc, rất giỏi!