原文地址:https://pgducklake.select/blog/introducing-pg-duckpipe/

pg_duckpipe:湖仓实时 CDC 工具

@qianzhen

2026年3月10日


TL;DR: pg_duckpipe 是一个全新的 PostgreSQL 扩展,它通过基于 WAL 的 CDC(变更数据捕获)技术,将您的常规堆表持续同步到 DuckLake 列式表中。仅需一个 SQL 调用即可启动,无需外部基础设施。


为什么需要 pg_duckpipe?

当我们发布 pg_ducklake 时,它为 PostgreSQL 带来了列式湖仓存储层:由 DuckDB 驱动的分析型表,底层由 Parquet 支持,元数据则存放在 PostgreSQL 自己的目录中。随之而来一个反复被提及的问题:如何自动让这些分析表与我的事务表保持同步?

这是一个现实问题。如果手动管理 DuckLake 表,运行定期的 ETL 任务或批量插入,最终会导致数据陈旧、需要维护额外的脚本,并且随着表的增加,运维复杂度也随之上升。对于那些希望实时获取 OLTP 数据分析视图的团队来说,这很快会变成一个痛点。

pg_duckpipe 正是为了解决这个问题而生的。它是一个 PostgreSQL 扩展(也可以作为一个独立的守护进程运行),能够将变更从常规堆表实时流式传输到 DuckLake 列式表中。无需 Kafka、无需 Debezium、无需外部编排器。只需要 PostgreSQL。

[pg_duckpipe 架构图]


快速入门

Docker 镜像已预先配置好 pg_ducklake 和 pg_duckpipe:

docker run -d --name duckpipe \
  -p 15432:5432 \
  -e POSTGRES_PASSWORD=duckdb \
  pgducklake/pgduckpipe:18-main

同步本地表

将堆表同步到其列式副本,用于分析查询:

-- 连接到数据库
psql -h localhost -p 15432 -U postgres

-- 创建一张表并插入一些数据
CREATE TABLE orders (
    id BIGSERIAL PRIMARY KEY,
    customer_id BIGINT,
    total INT,
    created_at TIMESTAMP DEFAULT now()
);
INSERT INTO orders(customer_id, total)
SELECT (random() * 1000)::bigint, (random() * 10000)::int
FROM generate_series(1, 100000);

-- 开始同步到列式副本
SELECT duckpipe.add_table('public.orders');

-- 查询列式副本
SELECT customer_id, sum(total), count(*)
FROM orders_ducklake
GROUP BY customer_id
ORDER BY sum(total) DESC
LIMIT 10;

从远程 PostgreSQL 同步

pg_duckpipe 可以从远程 PostgreSQL 实例进行复制。源数据库无需安装 pg_duckpipe 或 pg_ducklake,只需要 wal_level = logical 和一个复制用户即可。这使得为现有生产数据库添加分析层变得非常容易:

-- 创建一个指向远程数据库的同步组
SELECT duckpipe.create_group('prod_replica',
    conninfo => 'host=prod-db.example.com port=5432 dbname=myapp user=replicator');

-- 添加要同步的表
SELECT duckpipe.add_table('public.orders',    sync_group => 'prod_replica');
SELECT duckpipe.add_table('public.customers', sync_group => 'prod_replica');

-- 检查同步进度
SELECT source_table, state, rows_synced FROM duckpipe.status();

工作原理

pg_duckpipe 使用 Rust 编写。以下是变更从源端流向湖仓的过程:

  1. 尾随 WAL 流:通过 pgoutput 插件连接到 PostgreSQL 的逻辑复制协议。
  2. 解码与路由:解析每个变更,并将其分派到对应表的内存队列中。
  3. 刷新到 DuckLake:通过嵌入的 DuckDB 连接,将排队的变更批量写入 DuckLake 列式表。

[pg_duckpipe 处理流程图]

几个值得注意的设计选择:

  • 每表隔离:每个同步的表在其独立的状态机(SNAPSHOT(快照)、CATCHUP(追赶)、STREAMING(流式传输))中推进。一个表的故障永远不会阻塞其他表。
  • 背压机制:如果刷新工作线程落后,槽消费者会暂停 WAL 消费,而不是累积无界的内存。
  • 崩溃安全:基于每表的 LSN(日志序列号)跟踪,以及幂等的 DELETE+INSERT 刷新路径,确保了至少一次交付,并在重启后能够正确重放。

要深入了解其架构,请查阅代码库和文档。


路线图

pg_duckpipe 正在积极开发中。以下是我们下一步的工作重点:

  • 功能性:支持模式 DDL 的传播,支持更广泛的 PostgreSQL 版本。
  • 性能:实现刷新工作线程池、有界队列、自适应批处理。
  • 维护与可观测性:自动压缩、可调度的刷新策略、每表延迟指标。

试试看吧,如果出现问题请提交 Issue,如果您想帮助塑造它,请发送 PR。让我们一起开始"鸭嘴管道"!

GitHub:github.com/relytcloud/pg_duckpipe

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐