TiDB と Confluent Platform の統合に関するクイック スタート ガイド
このドキュメントでは、 TiCDCを使用して TiDB を Confluent Platform に統合する方法を紹介します。
コンフルエントなプラットフォームは、Apache Kafka をコアとするデータ ストリーミング プラットフォームです。多くの公式およびサードパーティのシンク コネクタを備えた Confluent Platform では、ストリーム ソースをリレーショナル データベースまたは非リレーショナル データベースに簡単に接続できます。
TiDB を Confluent Platform と統合するには、TiCDCコンポーネントを Avro プロトコルと共に使用できます。 TiCDC は、Confluent Platform が認識する形式でデータの変更を Kafka にストリーミングできます。詳細な統合ガイドについては、次のセクションを参照してください。
前提条件
ノート:
このチュートリアルでは、 JDBC シンク コネクタを使用して TiDB データを下流のリレーショナル データベースに複製します。簡単にするために、ここではSQLiteを例として使用します。
Zookeeper、Kafka、およびスキーマ レジストリが正しくインストールされていることを確認します。 Confluent プラットフォーム クイック スタート ガイドに従ってローカル テスト環境を展開することをお勧めします。
次のコマンドを実行して、JDBC シンク コネクタがインストールされていることを確認します。結果には
jdbc-sinkが含まれている必要があります。confluent local services connect connector list
統合手順
次の構成を
jdbc-sink-connector.jsonに保存します。{ "name": "jdbc-sink-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "testdb_test", "connection.url": "jdbc:sqlite:/tmp/test.db", "connection.ds.pool.size": 5, "table.name.format": "test", "auto.create": true, "auto.evolve": true } }次のコマンドを実行して、JDBC シンク コネクタのインスタンスを作成します (Kafka が
127.0.0.1:8083でリッスンしていると仮定します)。curl -X POST -H "Content-Type: application/json" -d @jdbc-sink-connector.json http://127.0.0.1:8083/connectors次のいずれかの方法で TiCDC をデプロイします。 TiCDC がすでにデプロイされている場合は、この手順をスキップできます。
- TiUP を使用してTiUPを含む新しい TiDB クラスターをデプロイする
- TiUP を使用して既存の TiDB クラスターにTiUPを追加する
- バイナリを使用して TiCDC を既存の TiDB クラスターに追加する (非推奨)
続行する前に、TiDB および TiCDC クラスターが正常であることを確認してください。
cdc cliコマンドを実行してchangefeedを作成します。./cdc cli changefeed create --pd="http://127.0.0.1:2379" --sink-uri="kafka://127.0.0.1:9092/testdb_test?protocol=avro" --opts "registry=http://127.0.0.1:8081"ノート:
PD、Kafka、およびスキーマ レジストリがそれぞれのデフォルト ポートで実行されていることを確認します。
データ複製のテスト
TiDB が Confluent Platform と統合されたら、以下の手順例に従ってデータ複製をテストできます。
TiDB クラスターに
testdbのデータベースを作成します。CREATE DATABASE IF NOT EXISTS testdb;testdbでtestテーブルを作成します。USE testdb; CREATE TABLE test ( id INT PRIMARY KEY, v TEXT );ノート:
データベース名またはテーブル名を変更する必要がある場合は、それに応じて
jdbc-sink-connector.jsonのtopicsを変更します。データを TiDB に挿入します。
INSERT INTO test (id, v) values (1, 'a'); INSERT INTO test (id, v) values (2, 'b'); INSERT INTO test (id, v) values (3, 'c'); INSERT INTO test (id, v) values (4, 'd');データがダウンストリームにレプリケートされるまでしばらく待ちます。次に、ダウンストリームのデータを確認します。
sqlite3 test.db sqlite> SELECT * from test;