DeepSeek总结的 pg_duckpipe:湖仓实时 CDC 工具
pg_duckpipe是一款PostgreSQL扩展工具,通过WAL日志的CDC技术实现事务表到DuckLake列式表的实时同步。该工具无需外部基础设施,仅需SQL命令即可启动同步,支持本地和远程PostgreSQL实例。其核心架构采用Rust开发,包含WAL流处理、变更解码和批量刷新机制,具有表隔离、背压保护和崩溃安全等特性。当前版本支持基础同步功能,未来计划增强DDL传播、性能优化和运维监控能
原文地址:https://pgducklake.select/blog/introducing-pg-duckpipe/
pg_duckpipe:湖仓实时 CDC 工具
@qianzhen
2026年3月10日
TL;DR: pg_duckpipe 是一个全新的 PostgreSQL 扩展,它通过基于 WAL 的 CDC(变更数据捕获)技术,将您的常规堆表持续同步到 DuckLake 列式表中。仅需一个 SQL 调用即可启动,无需外部基础设施。
为什么需要 pg_duckpipe?
当我们发布 pg_ducklake 时,它为 PostgreSQL 带来了列式湖仓存储层:由 DuckDB 驱动的分析型表,底层由 Parquet 支持,元数据则存放在 PostgreSQL 自己的目录中。随之而来一个反复被提及的问题:如何自动让这些分析表与我的事务表保持同步?
这是一个现实问题。如果手动管理 DuckLake 表,运行定期的 ETL 任务或批量插入,最终会导致数据陈旧、需要维护额外的脚本,并且随着表的增加,运维复杂度也随之上升。对于那些希望实时获取 OLTP 数据分析视图的团队来说,这很快会变成一个痛点。
pg_duckpipe 正是为了解决这个问题而生的。它是一个 PostgreSQL 扩展(也可以作为一个独立的守护进程运行),能够将变更从常规堆表实时流式传输到 DuckLake 列式表中。无需 Kafka、无需 Debezium、无需外部编排器。只需要 PostgreSQL。
[pg_duckpipe 架构图]
快速入门
Docker 镜像已预先配置好 pg_ducklake 和 pg_duckpipe:
docker run -d --name duckpipe \
-p 15432:5432 \
-e POSTGRES_PASSWORD=duckdb \
pgducklake/pgduckpipe:18-main
同步本地表
将堆表同步到其列式副本,用于分析查询:
-- 连接到数据库
psql -h localhost -p 15432 -U postgres
-- 创建一张表并插入一些数据
CREATE TABLE orders (
id BIGSERIAL PRIMARY KEY,
customer_id BIGINT,
total INT,
created_at TIMESTAMP DEFAULT now()
);
INSERT INTO orders(customer_id, total)
SELECT (random() * 1000)::bigint, (random() * 10000)::int
FROM generate_series(1, 100000);
-- 开始同步到列式副本
SELECT duckpipe.add_table('public.orders');
-- 查询列式副本
SELECT customer_id, sum(total), count(*)
FROM orders_ducklake
GROUP BY customer_id
ORDER BY sum(total) DESC
LIMIT 10;
从远程 PostgreSQL 同步
pg_duckpipe 可以从远程 PostgreSQL 实例进行复制。源数据库无需安装 pg_duckpipe 或 pg_ducklake,只需要 wal_level = logical 和一个复制用户即可。这使得为现有生产数据库添加分析层变得非常容易:
-- 创建一个指向远程数据库的同步组
SELECT duckpipe.create_group('prod_replica',
conninfo => 'host=prod-db.example.com port=5432 dbname=myapp user=replicator');
-- 添加要同步的表
SELECT duckpipe.add_table('public.orders', sync_group => 'prod_replica');
SELECT duckpipe.add_table('public.customers', sync_group => 'prod_replica');
-- 检查同步进度
SELECT source_table, state, rows_synced FROM duckpipe.status();
工作原理
pg_duckpipe 使用 Rust 编写。以下是变更从源端流向湖仓的过程:
- 尾随 WAL 流:通过
pgoutput插件连接到 PostgreSQL 的逻辑复制协议。 - 解码与路由:解析每个变更,并将其分派到对应表的内存队列中。
- 刷新到 DuckLake:通过嵌入的 DuckDB 连接,将排队的变更批量写入 DuckLake 列式表。
[pg_duckpipe 处理流程图]
几个值得注意的设计选择:
- 每表隔离:每个同步的表在其独立的状态机(
SNAPSHOT(快照)、CATCHUP(追赶)、STREAMING(流式传输))中推进。一个表的故障永远不会阻塞其他表。 - 背压机制:如果刷新工作线程落后,槽消费者会暂停 WAL 消费,而不是累积无界的内存。
- 崩溃安全:基于每表的 LSN(日志序列号)跟踪,以及幂等的
DELETE+INSERT刷新路径,确保了至少一次交付,并在重启后能够正确重放。
要深入了解其架构,请查阅代码库和文档。
路线图
pg_duckpipe 正在积极开发中。以下是我们下一步的工作重点:
- 功能性:支持模式 DDL 的传播,支持更广泛的 PostgreSQL 版本。
- 性能:实现刷新工作线程池、有界队列、自适应批处理。
- 维护与可观测性:自动压缩、可调度的刷新策略、每表延迟指标。
试试看吧,如果出现问题请提交 Issue,如果您想帮助塑造它,请发送 PR。让我们一起开始"鸭嘴管道"!
GitHub:github.com/relytcloud/pg_duckpipe
更多推荐



所有评论(0)