这里给你一套可直接运行、最简洁的 Flink 消费 MySQL CDC Demo,Flink 1.17 + Flink-CDC 2.4,Java 版,复制就能跑。

  1. pom.xml 依赖

xml

org.apache.flink flink-streaming-java ${flink.version} org.apache.flink flink-clients ${flink.version}
<!-- Flink CDC MySQL -->
<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>${flink.cdc.version}</version>
</dependency>

<!-- 序列化 json -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.83</version>
</dependency>
 

  1. Flink CDC 消费 MySQL 完整代码

功能:实时捕获 insert/update/delete,并打印。

java

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkMysqlCdcDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 开启 checkpoint(必须开,否则无法精确一次)
    env.enableCheckpointing(5000);

    // 1. 构建 MySQL CDC 数据源
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname("localhost")
            .port(3306)
            .databaseList("test_db")       // 要监听的库
            .tableList("test_db.user")     // 要监听的表
            .username("root")
            .password("root")
            // 输出 JSON 格式
            .deserializer(new JsonDebeziumDeserializationSchema())
            .build();

    // 2. 读取并打印
    env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql-cdc-source")
       .print();

    env.execute("Flink MySQL CDC Demo");
}

}

  1. MySQL 必须开启 binlog

在  my.cnf  /  my.ini  加:

ini

server-id = 1
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL

重启 MySQL 后检查:

sql

show variables like ‘log_bin’;
show variables like ‘binlog_format’;

必须为  ON  和  ROW 。

  1. 运行后输出示例(JSON)
  • insert/update/delete 都会被捕获
  • before  是旧数据, after  是新数据

json

{
“before”: null,
“after”: {
“id”: 1,
“name”: “test”,
“age”: 20
},
“op”: “c” // c=insert, u=update, d=delete
}

  1. 你接下来想要哪个版本?

我可以直接给你完整代码:

1. Flink CDC 解析 JSON 转实体对象
2. Flink CDC 写入 Kafka
3. Flink CDC 写入 ClickHouse
4. Flink CDC SQL 版(最简单)

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐