TiDB 集成 Confluent Platform 快速上手指南

本文档介绍如何使用 TiCDC 将 TiDB 集成到 Confluent Platform。

Confluent Platform 是一个以 Apache Kafka 为核心的流数据处理平台,可以借助官方或第三方的 sink connector 将数据源连接到关系型或非关系型数据库。

你可以使用 TiCDC 组件和 Avro 协议来集成 TiDB 和 Confluent Platform。TiCDC 能将数据更改以 Confluent Platform 能识别的格式流式传输到 Kafka。下文详细介绍了集成的操作步骤。

环境准备

  • 确保 Zookeeper、Kafka 和 Schema Registry 已正确安装。推荐参照 Confluent Platform 快速入门指南部署本地测试环境。

  • 通过以下命令确保 JDBC sink connector 已安装。返回结果中预期包含 jdbc-sink

    confluent local services connect connector list

集成步骤

  1. 将下方的配置样例保存为 jdbc-sink-connector.json 文件:

    { "name": "jdbc-sink-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "testdb_test", "connection.url": "jdbc:sqlite:/tmp/test.db", "connection.ds.pool.size": 5, "table.name.format": "test", "auto.create": true, "auto.evolve": true } }
  2. 运行下方命令新建一个 JDBC sink connector 实例(假设 Kafka 监听的 IP 地址与端口是 127.0.0.1:8083):

    curl -X POST -H "Content-Type: application/json" -d jdbc-sink-connector.json http://127.0.0.1:8083/connectors
  3. 通过以下任一方式部署 TiCDC。如果已经部署了 TiCDC,可以跳过这一步。

    在继续接下来的操作之前,请先确保 TiDB 和 TiCDC 集群处于健康状态。

  4. 运行下面的 cdc cli 命令,新建一个同步任务 changefeed

    ./cdc cli changefeed create --pd="http://127.0.0.1:2379" --sink-uri="kafka://127.0.0.1:9092/testdb_test?protocol=avro" --opts "registry=http://127.0.0.1:8081"

数据同步测试

TiDB 与 Confluent Platform 成功集成后,你可以按照以下步骤来测试数据同步功能。

  1. 在 TiDB 集群中新建 testdb 数据库:

    CREATE DATABASE IF NOT EXISTS testdb;

    testdb 数据库中创建 test 数据表:

    USE testdb; CREATE TABLE test ( id INT PRIMARY KEY, v TEXT );
  2. test 数据表中插入数据:

    INSERT INTO test (id, v) values (1, 'a'); INSERT INTO test (id, v) values (2, 'b'); INSERT INTO test (id, v) values (3, 'c'); INSERT INTO test (id, v) values (4, 'd');
  3. 等待数据被同步到下游数据库中,并检查下游的数据:

    sqlite3 test.db sqlite> SELECT * from test;