これはアーカイブされた TiDB のドキュメントであり、更新は行われていません。最新の LTS バージョンのドキュメントを表示する

TiDB と Confluent Platform の統合に関するクイック スタート ガイド

このドキュメントでは、 TiCDCを使用して TiDB を Confluent Platform に統合する方法を紹介します。

コンフルエントなプラットフォームは、Apache Kafka をコアとするデータ ストリーミング プラットフォームです。多くの公式およびサードパーティのシンク コネクタを備えた Confluent Platform では、ストリーム ソースをリレーショナル データベースまたは非リレーショナル データベースに簡単に接続できます。

TiDB を Confluent Platform と統合するには、TiCDCコンポーネントを Avro プロトコルと共に使用できます。 TiCDC は、Confluent Platform が認識する形式でデータの変更を Kafka にストリーミングできます。詳細な統合ガイドについては、次のセクションを参照してください。

前提条件

ノート:

このチュートリアルでは、 JDBC シンク コネクタを使用して TiDB データを下流のリレーショナル データベースに複製します。簡単にするために、ここではSQLiteを例として使用します。

  • Zookeeper、Kafka、およびスキーマ レジストリが正しくインストールされていることを確認します。 Confluent プラットフォーム クイック スタート ガイドに従ってローカル テスト環境を展開することをお勧めします。

  • 次のコマンドを実行して、JDBC シンク コネクタがインストールされていることを確認します。結果にはjdbc-sinkが含まれている必要があります。

    confluent local services connect connector list

統合手順

  1. 次の構成をjdbc-sink-connector.jsonに保存します。

    { "name": "jdbc-sink-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "testdb_test", "connection.url": "jdbc:sqlite:/tmp/test.db", "connection.ds.pool.size": 5, "table.name.format": "test", "auto.create": true, "auto.evolve": true } }
  2. 次のコマンドを実行して、JDBC シンク コネクタのインスタンスを作成します (Kafka が127.0.0.1:8083でリッスンしていると仮定します)。

    curl -X POST -H "Content-Type: application/json" -d @jdbc-sink-connector.json http://127.0.0.1:8083/connectors
  3. 次のいずれかの方法で TiCDC をデプロイします。 TiCDC がすでにデプロイされている場合は、この手順をスキップできます。

    続行する前に、TiDB および TiCDC クラスターが正常であることを確認してください。

  4. cdc cliコマンドを実行してchangefeedを作成します。

    ./cdc cli changefeed create --pd="http://127.0.0.1:2379" --sink-uri="kafka://127.0.0.1:9092/testdb_test?protocol=avro" --opts "registry=http://127.0.0.1:8081"

    ノート:

    PD、Kafka、およびスキーマ レジストリがそれぞれのデフォルト ポートで実行されていることを確認します。

データ複製のテスト

TiDB が Confluent Platform と統合されたら、以下の手順例に従ってデータ複製をテストできます。

  1. TiDB クラスターにtestdbのデータベースを作成します。

    CREATE DATABASE IF NOT EXISTS testdb;

    testdbtestテーブルを作成します。

    USE testdb; CREATE TABLE test ( id INT PRIMARY KEY, v TEXT );

    ノート:

    データベース名またはテーブル名を変更する必要がある場合は、それに応じてjdbc-sink-connector.jsontopicsを変更します。

  2. データを TiDB に挿入します。

    INSERT INTO test (id, v) values (1, 'a'); INSERT INTO test (id, v) values (2, 'b'); INSERT INTO test (id, v) values (3, 'c'); INSERT INTO test (id, v) values (4, 'd');
  3. データがダウンストリームにレプリケートされるまでしばらく待ちます。次に、ダウンストリームのデータを確認します。

    sqlite3 test.db sqlite> SELECT * from test;