豆包 给你一套可直接运行、最简洁的 Flink 消费 MySQL CDC Demo,Flink 1.17 + Flink-CDC 2.4,Java 版,复制就能跑
这里给你一套可直接运行、最简洁的 Flink 消费 MySQL CDC Demo,Flink 1.17 + Flink-CDC 2.4,Java 版,复制就能跑。
- pom.xml 依赖
xml
org.apache.flink flink-streaming-java ${flink.version} org.apache.flink flink-clients ${flink.version}<!-- Flink CDC MySQL -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flink.cdc.version}</version>
</dependency>
<!-- 序列化 json -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
- Flink CDC 消费 MySQL 完整代码
功能:实时捕获 insert/update/delete,并打印。
java
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkMysqlCdcDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 开启 checkpoint(必须开,否则无法精确一次)
env.enableCheckpointing(5000);
// 1. 构建 MySQL CDC 数据源
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("test_db") // 要监听的库
.tableList("test_db.user") // 要监听的表
.username("root")
.password("root")
// 输出 JSON 格式
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
// 2. 读取并打印
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql-cdc-source")
.print();
env.execute("Flink MySQL CDC Demo");
}
}
- MySQL 必须开启 binlog
在 my.cnf / my.ini 加:
ini
server-id = 1
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
重启 MySQL 后检查:
sql
show variables like ‘log_bin’;
show variables like ‘binlog_format’;
必须为 ON 和 ROW 。
- 运行后输出示例(JSON)
- insert/update/delete 都会被捕获
- before 是旧数据, after 是新数据
json
{
“before”: null,
“after”: {
“id”: 1,
“name”: “test”,
“age”: 20
},
“op”: “c” // c=insert, u=update, d=delete
}
- 你接下来想要哪个版本?
我可以直接给你完整代码:
1. Flink CDC 解析 JSON 转实体对象
2. Flink CDC 写入 Kafka
3. Flink CDC 写入 ClickHouse
4. Flink CDC SQL 版(最简单)
更多推荐



所有评论(0)