sách gpt4 ai đã đi

database - 如何比较两个kafka流或数据库表之间的(10亿条记录)数据

In lại 作者:行者123 更新时间:2023-12-05 07:30:42 26 4
mua khóa gpt4 Nike

我们正在通过 CDC 将数据从 DB2 (table-1) 发送到 Kafka 主题 (topic-1)。我们需要在 DB2 数据和 Kafka 主题之间进行协调。我们有两个选择 -

a) 将所有 kafka 主题数据放入 DB2(作为 table-1-copy),然后进行左外连接(在 table-1 和 table-1-copy 之间)以查看不匹配的记录,创建增量并将其推回卡夫卡。câu hỏi:可扩展性——我们的数据集大约有十亿条记录,我不确定 DB2 DBA 是否会让我们运行如此庞大的连接操作(可能很容易持续超过 15-20 分钟)。

b) 再次将 DB2 推回并行的 kafka 主题(topic-1-copy),然后执行一些基于 kafka 流的解决方案以在 kafka topic-1 和 topic-1-copy 之间进行左外连接。我仍然在思考 kafka 流和左外连接。我不确定(在 kafka 流中使用窗口系统)我是否能够将 topic-1 的全部内容与 topic-1-copy 进行比较。

更糟糕的是,kafka中的topic-1是一个紧凑的topic,因此,当我们将数据从 DB2 推回 Kafka topic-1-copy 时,我们无法确定地启动 kafka topic-compaction 周期以确保 topic-1 和 topic-1-copy 在运行任何类型的比较之前都已完全压缩对他们进行操作。

c) 有没有其他我们可以考虑的框架选项?

理想的解决方案必须针对任何规模的数据进行扩展。

1 Câu trả lời

我看不出您不能在 Kafka Streams 或 KSQL 中执行此操作的原因。两者都支持表-表连接。这是假设支持数据格式。

键压缩不会影响结果,因为 Streams 和 KSQL 都会构建连接两个表的正确最终状态。如果压缩已经运行,需要处理的数据量可能会更少,但结果是一样的。

例如,在 ksqlDB 中,您可以将两个主题作为表导入并执行连接,然后通过 topic-1 表为 vô giá trị 进行过滤以查找缺失的列表行。

-- example using 0.9 ksqlDB, assuming a INT primary key:

-- create table from main topic:
CREATE TABLE_1
(ROWKEY INT PRIMARY KEY, )
WITH (kafka_topic='topic-1', value_format='?');

-- create table from second topic:
CREATE TABLE_2
(ROWKEY INT PRIMARY KEY, )
WITH (kafka_topic='topic-1-copy', value_format='?');

-- create a table containing only the missing keys:
CREATE MISSING AS
SELECT T2.* FROM TABLE_2 T2 LEFT JOIN TABLE_1 T1
WHERE T1.ROWKEY = null;

这种方法的好处是缺失行的 MISSING 表会自动更新:当您从源 DB2 实例中提取缺失的行并将它们生成到 topic-1 那么“MISSING”表中的行将被删除,即您会看到为 MISSING 主题生成的墓碑。

您甚至可以扩展此方法以查找不再存在于源数据库中的 topic-1 中的行:

-- using the same DDL statements for TABLE_1 and TABLE_2 from above

-- perform the join:
CREATE JOINED AS
SELECT * FROM TABLE_2 T2 FULL OUTER JOIN TABLE_1 T1;

-- detect rows in the DB that aren't in the topic:
CREATE MISSING AS
SELECT * FROM JOINED
WHERE T1_ROWKEY = null;

-- detect rows in the topic that aren't in the DB:
CREATE EXTRA AS
SELECT * FROM JOINED
WHERE T2_ROWKEY = null;

当然,您需要相应地调整集群的大小。 ksqlDB 集群越大,处理数据的速度就越快。它还需要磁盘容量来实现表。

您可以根据主题上的分区数设置的最大并行化量。如果您只有 1 个分区,则数据将按顺序处理。如果运行 100 个分区,则可以使用 100 个 CPU 内核处理数据,前提是您运行了足够多的 ksqlDB 实例。 (默认情况下,每个 ksqlDB 节点将为每个查询创建 4 个流处理线程,(尽管如果服务器有更多内核,您可以增加它!))。

关于database - 如何比较两个kafka流或数据库表之间的(10亿条记录)数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52129604/

26 4 0
行者123
Hồ sơ cá nhân

Tôi là một lập trình viên xuất sắc, rất giỏi!

Nhận phiếu giảm giá Didi Taxi miễn phí
Mã giảm giá Didi Taxi
Giấy chứng nhận ICP Bắc Kinh số 000000
Hợp tác quảng cáo: 1813099741@qq.com 6ren.com