Confluent Cloud および Snowflake とデータを統合する

Confluent は、強力なデータ統合機能を提供する Apache Kafka 互換のストリーミング データ プラットフォームです。このプラットフォームでは、ノンストップのリアルタイム ストリーミング データにアクセス、保存、および管理できます。

TiDB v6.1.0 以降、TiCDC は、増分データを Avro 形式で Confluent に複製することをサポートしています。このドキュメントでは、 TiCDCを使用して TiDB の増分データを Confluent にレプリケートし、さらに Confluent Cloud を介して Snowflake、ksqlDB、および SQL Server にデータをレプリケートする方法を紹介します。このドキュメントの構成は次のとおりです。

  1. TiCDC を含む TiDB クラスターをすばやくデプロイします。
  2. TiDB から Confluent Cloud にデータをレプリケートする変更フィードを作成します。
  3. Confluent Cloud から Snowflake、ksqlDB、および SQL Server にデータをレプリケートするコネクタを作成します。
  4. go-tpc を使用して TiDB にデータを書き込み、Snowflake、ksqlDB、および SQL Server でデータの変更を観察します。

上記の手順は、ラボ環境で実行されます。これらの手順を参照して、本番環境にクラスターをデプロイすることもできます。

増分データを Confluent Cloud に複製する

ステップ 1. 環境をセットアップする

  1. TiCDC を含む TiDB クラスターをデプロイします。

    ラボまたはテスト環境では、TiUP Playground を使用して、TiCDC を含む TiDB クラスターをすばやくデプロイできます。

    tiup playground --host 0.0.0.0 --db 1 --pd 1 --kv 1 --tiflash 0 --ticdc 1 # View cluster status tiup status

    TiUP がインストールされていない場合は、 TiUPをインストールするを参照してください。実稼働環境では、 TiCDC をデプロイの指示に従って TiCDC をデプロイできます。

  2. Confluent Cloud を登録し、Confluent クラスタを作成します。

    Basic クラスターを作成し、インターネット経由でアクセスできるようにします。詳細については、 Confluent Cloud のクイック スタートを参照してください。

ステップ 2. アクセス キー ペアを作成する

  1. クラスター API キーを作成します。

    コンフルエントなクラウドにサインインします。 [**データ統合]** > [ API キー] > [キーの作成] を選択します。表示される [ API キーのスコープの選択]ページで、 [グローバル アクセス]を選択します。

    作成後、以下に示すようにキー ペア ファイルが生成されます。

    === Confluent Cloud API key: xxx-xxxxx === API key: L5WWA4GK4NAT2EQV API secret: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx Bootstrap server: xxx-xxxxx.ap-east-1.aws.confluent.cloud:9092
  2. スキーマ レジストリ エンドポイントを記録します。

    Confluent Cloud Console で、[**スキーマ レジストリ]** > [ API エンドポイント] を選択します。スキーマ レジストリ エンドポイントを記録します。次に例を示します。

    https://yyy-yyyyy.us-east-2.aws.confluent.cloud
  3. スキーマ レジストリ API キーを作成します。

    Confluent Cloud Console で、[**スキーマ レジストリ]** > [ API 資格情報] を選択します。 [編集][キーの作成]の順にクリックします。

    作成後、以下に示すようにキー ペア ファイルが生成されます。

    === Confluent Cloud API key: yyy-yyyyy === API key: 7NBH2CAFM2LMGTH7 API secret: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

    この手順は、Confluent CLI を使用して実行することもできます。詳細については、 Confluent CLI を Confluent Cloud クラスタに接続するを参照してください。

ステップ 3. Kafka チェンジフィードを作成する

  1. changefeed 構成ファイルを作成します。

    Avro と Confluent Connector の要件に応じて、各テーブルの増分データを独立したトピックに送信する必要があり、主キーの値に基づいて各イベントに対してパーティションをディスパッチする必要があります。したがって、次の内容で changefeed 構成ファイルchangefeed.confを作成する必要があります。

    [sink] dispatchers = [ {matcher = ['*.*'], topic = "tidb_{schema}_{table}", partition="index-value"}, ]

    構成ファイルのdispatchersの詳細な説明については、 Kafka Sink のトピックおよびパーティション ディスパッチャーのルールをカスタマイズするを参照してください。

  2. 増分データを Confluent Cloud にレプリケートするための変更フィードを作成します。

    tiup ctl:v6.2.0 cdc changefeed create --pd="http://127.0.0.1:2379" --sink-uri="kafka://<broker_endpoint>/ticdc-meta?protocol=avro&replication-factor=3&enable-tls=true&auto-create-topic=true&sasl-mechanism=plain&sasl-user=<broker_api_key>&sasl-password=<broker_api_secret>" --schema-registry="https://<schema_registry_api_key>:<schema_registry_api_secret>@<schema_registry_endpoint>" --changefeed-id="confluent-changefeed" --config changefeed.conf

    次のフィールドの値を、 ステップ 2. アクセス キー ペアを作成するで作成または記録した値に置き換える必要があります。

    • <broker_endpoint>
    • <broker_api_key>
    • <broker_api_secret>
    • <schema_registry_api_key>
    • <schema_registry_api_secret>
    • <schema_registry_endpoint>

    値を置き換える前に、 HTML URL エンコーディング リファレンスに基づいて<schema_registry_api_secret>をエンコードする必要があることに注意してください。前述のすべてのフィールドを置き換えると、構成ファイルは次のようになります。

    tiup ctl:v6.2.0 cdc changefeed create --pd="http://127.0.0.1:2379" --sink-uri="kafka://xxx-xxxxx.ap-east-1.aws.confluent.cloud:9092/ticdc-meta?protocol=avro&replication-factor=3&enable-tls=true&auto-create-topic=true&sasl-mechanism=plain&sasl-user=L5WWA4GK4NAT2EQV&sasl-password=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" --schema-registry="https://7NBH2CAFM2LMGTH7:xxxxxxxxxxxxxxxxxx@yyy-yyyyy.us-east-2.aws.confluent.cloud" --changefeed-id="confluent-changefeed" --config changefeed.conf
    • コマンドを実行して、変更フィードを作成します。

      • 変更フィードが正常に作成されると、次のように、変更フィード ID などの変更フィード情報が表示されます。

        Create changefeed successfully! ID: confluent-changefeed Info: {... changfeed info json struct ...}
      • コマンドを実行しても結果が返されない場合は、コマンドを実行したサーバーと Confluent Cloud 間のネットワーク接続を確認してください。詳細については、 Confluent Cloud への接続をテストするを参照してください。

  3. 変更フィードを作成したら、次のコマンドを実行して変更フィードのステータスを確認します。

    tiup ctl:v6.2.0 cdc changefeed list --pd="http://127.0.0.1:2379"

    TiCDCクラスタとレプリケーション タスクの管理を参照して、変更フィードを管理できます。

ステップ 4. データを書き込んで変更ログを生成する

上記の手順が完了すると、TiCDC は TiDB クラスター内の増分データの変更ログを Confluent Cloud に送信します。このセクションでは、TiDB にデータを書き込んで変更ログを生成する方法について説明します。

  1. サービスのワークロードをシミュレートします。

    ラボ環境で変更ログを生成するには、go-tpc を使用してデータを TiDB クラスターに書き込みます。具体的には、次のコマンドを実行して、TiDB クラスターにデータベースtpccを作成します。次に、TiUP ベンチを使用して、この新しいデータベースにデータを書き込みます。

    tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 prepare tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 run --time 300s

    go-tpc の詳細については、 TiDB で TPC-C テストを実行する方法を参照してください。

  2. Confluent Cloud でデータを観察します。

    Confluent topics

    Confluent Cloud Console で、[トピック] をクリックします。ターゲット トピックが作成され、データを受信していることがわかります。この時点で、TiDB データベースの増分データが Confluent Cloud に正常に複製されます。

データを Snowflake と統合する

Snowflake は、クラウド ネイティブのデータ ウェアハウスです。 Confluent では、Snowflake シンク コネクタを作成することで、TiDB の増分データを Snowflake にレプリケートできます。

前提条件

統合手順

  1. Snowflake でデータベースとスキーマを作成します。

    Snowflake コントロール コンソールで、 [**データ]** > [データベース] を選択します。 TPCCという名前のデータベースとTiCDCという名前のスキーマを作成します。

  2. Confluent Cloud Console で、[データ統合] > [コネクタ] > [ Snowflake Sink]を選択します。以下のページが表示されます。

    Add snowflake sink connector

  3. Snowflake にレプリケートするトピックを選択します。次に、次のページに進みます。

    Configuration

  4. Snowflakeに接続するための認証情報を指定します。前の手順で作成した値をデータベース名スキーマ名に入力します。次に、次のページに進みます。

    Configuration

  5. [**Configuration / コンフィグレーション]**ページで、[入力 Kafka レコードの値の形式][入力 Kafka レコードのキー形式]の両方でAVROを選択します。次に [続行] をクリックします。コネクタが作成され、ステータスがRunningになるまで待ちます。これには数分かかる場合があります。

    Data preview

  6. Snowflake コンソールで、 [データ] > [データベース] > [ TPCC] > [ TiCDC ] を選択します。 TiDB の増分データが Snowflake にレプリケートされていることがわかります。 Snowflake とのデータ統合が完了しました。

データを ksqlDB と統合する

ksqlDB は、ストリーム処理アプリケーション専用のデータベースです。 Confluent Cloud で ksqlDB クラスターを作成し、TiCDC によって複製された増分データにアクセスできます。

  1. Confluent Cloud Console でksqlDBを選択し、指示に従って ksqlDB クラスターを作成します。

    ksqlDB クラスターのステータスがRunningになるまで待ちます。このプロセスには数分かかります。

  2. ksqlDB エディターで、次のコマンドを実行して、 tidb_tpcc_ordersトピックにアクセスするためのストリームを作成します。

    CREATE STREAM orders (o_id INTEGER, o_d_id INTEGER, o_w_id INTEGER, o_c_id INTEGER, o_entry_d STRING, o_carrier_id INTEGER, o_ol_cnt INTEGER, o_all_local INTEGER) WITH (kafka_topic='tidb_tpcc_orders', partitions=3, value_format='AVRO');
  3. 次のコマンドを実行して、注文の STREAM データを確認します。

    SELECT * FROM ORDERS EMIT CHANGES;

    Select from orders

    前の図に示すように、増分データが ksqlDB に複製されていることがわかります。 ksqlDB とのデータ統合が行われます。

データを SQL Server と統合する

Microsoft SQL Server は、Microsoft が開発したリレーショナル データベース管理システム (RDBMS) です。 Confluent では、SQL Server シンク コネクタを作成することで、TiDB の増分データを SQL Server にレプリケートできます。

  1. SQL Server に接続し、 tpccという名前のデータベースを作成します。

    [ec2-user@ip-172-1-1-1 bin]$ sqlcmd -S 10.61.43.14,1433 -U admin Password: 1> create database tpcc 2> go 1> select name from master.dbo.sysdatabases 2> go name ---------------------------------------------------------------------- master tempdb model msdb rdsadmin tpcc (6 rows affected)
  2. Confluent Cloud Console で、[データ統合] > [コネクタ] > [ Microsoft SQL Server Sink]を選択します。以下のページが表示されます。

    Topic selection

  3. SQL Server にレプリケートするトピックを選択します。次に、次のページに進みます。

    Authentication

  4. 接続および認証情報を入力します。次に、次のページに進みます。

  5. [Configuration / コンフィグレーション] ページで、次のフィールドを構成し、[続行] をクリックします。

    分野価値
    入力 Kafka レコード値の形式アブロ
    挿入モードアップサート
    テーブルの自動作成真実
    列の自動追加真実
    PK モードrecord_key
    入力 Kafka レコード キー形式アブロ
    null で削除真実
  6. 構成後、[続行] をクリックします。コネクタのステータスがRunningになるまで待ちます。これには数分かかる場合があります。

    Results

  7. SQL Server に接続し、データを観察します。前の図に示すように、増分データが SQL Server にレプリケートされていることがわかります。 SQL Server とのデータ統合が完了しました。