TiCDC オープン プロトコル

TiCDC Open Protocol は、監視、キャッシング、フルテキスト インデックス作成、分析エンジン、および異なるデータベース間のプライマリ/セカンダリ レプリケーションのためのデータ ソースを提供する、行レベルのデータ変更通知プロトコルです。 TiCDC は TiCDC Open Protocol に準拠し、TiDB のデータ変更を MQ (Message Queue) などのサードパーティのデータ媒体に複製します。

TiCDC Open Protocol は、イベントを基本単位として使用して、データ変更イベントをダウンストリームに複製します。イベントは次の 3 つのカテゴリに分類されます。

  • 行変更イベント: 行のデータ変更を表します。行が変更されると、このイベントが送信され、変更された行に関する情報が含まれます。
  • DDL イベント: DDL の変更を表します。このイベントは、DDL ステートメントがアップストリームで正常に実行された後に送信されます。 DDL イベントはすべての MQ パーティションにブロードキャストされます。
  • 解決済みイベント: 受信したイベントが完了する前の特別な時点を表します。

制限

  • ほとんどの場合、バージョンの Row Changed Event は 1 回だけ送信されますが、ノード障害やネットワーク パーティションなどの特殊な状況では、同じバージョンの Row Changed Event が複数回送信されることがあります。
  • 同じテーブルで、最初に送信された各バージョンの行変更イベントは、イベント ストリームのタイムスタンプ (TS) の順序で増加します。
  • 解決済みイベントは、各 MQ パーティションに定期的にブロードキャストされます。 Resolved Event は、Resolved Event TS より前の TS を持つイベントがダウンストリームに送信されたことを意味します。
  • DDL イベントは各 MQ パーティションにブロードキャストされます。
  • 行の複数の行変更イベントが同じ MQ パーティションに送信されます。

メッセージ形式

メッセージには、次の形式で配置された 1 つ以上のイベントが含まれます。

鍵:

オフセット(バイト)0~78~1516~(15+丈1)......
パラメータプロトコルのバージョン長さ1イベント キー 1長さNイベントキーN

価値:

オフセット(バイト)0~78~(7+長さ1)......
パラメータ長さ1イベント値1長さNイベント値N
  • LengthNN番目のキー/値の長さを表します。
  • 長さとプロトコルバージョンはビッグエンディアンのint64型です。
  • 現在のプロトコルのバージョンは1です。

イベント形式

このセクションでは、行変更イベント、DDL イベント、および解決イベントの形式を紹介します。

行変更イベント

  • 鍵:

    { "ts":<TS>, "scm":<Schema Name>, "tbl":<Table Name>, "t":1 }
    パラメータタイプ説明
    TS番号行変更の原因となったトランザクションのタイムスタンプ。
    スキーマ名行が含まれるスキーマの名前。
    テーブル名行が含まれるテーブルの名前。
  • 価値:

    Insertイベント。新たに追加された行データが出力されます。

    { "u":{ <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> }, <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> } } }

    Updateイベント。新たに追加された行データ(「u」)と更新前の行データ(「p」)が出力されます。後者 (「p」) は、古い値機能が有効になっている場合にのみ出力されます。

    { "u":{ <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> }, <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> } }, "p":{ <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> }, <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> } } }

    Deleteイベント。削除された行データが出力されます。古い値機能が有効になっている場合、 Deleteイベントには、削除された行データのすべての列が含まれます。この機能が無効になっている場合、 Deleteイベントにはハンドルキー列のみが含まれます。

    { "d":{ <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> }, <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> } } }
    パラメータタイプ説明
    カラム名列名。
    カラムの種類番号列のタイプ。詳細については、 カラムタイプ コードを参照してください。
    どこでハンドルブール値この列がWhere句のフィルター条件になるかどうかを決定します。この列がテーブルで一意の場合、 Where Handletrueです。
    国旗番号列のビット フラグ。詳細については、 列のビット フラグを参照してください。
    カラムの値どれでもカラムの値。

DDL イベント

  • 鍵:

    { "ts":<TS>, "scm":<Schema Name>, "tbl":<Table Name>, "t":2 }
    パラメータタイプ説明
    TS番号DDL 変更を実行するトランザクションのタイムスタンプ。
    スキーマ名DDL 変更のスキーマ名。空の文字列の場合があります。
    テーブル名DDL 変更のテーブル名。空の文字列の場合があります。
  • 価値:

    { "q":<DDL Query>, "t":<DDL Type> }
    パラメータタイプ説明
    DDL クエリDDL クエリ SQL
    DDL タイプDDL タイプ。詳細については、 DDL タイプ コードを参照してください。

解決済みのイベント

  • 鍵:

    { "ts":<TS>, "t":3 }
    パラメータタイプ説明
    TS番号解決済みのタイムスタンプ。このイベントより前の TS はすべて送信されています。
  • 値:なし

イベント ストリーム出力の例

このセクションには、イベント ストリームの出力ログが表示されます。

アップストリームで次の SQL ステートメントを実行し、MQ パーティション番号が 2 であるとします。

CREATE TABLE test.t1(id int primary key, val varchar(16));

次のログ 1 とログ 3 から、DDL イベントがすべての MQ パーティションにブロードキャストされ、解決済みイベントが各 MQ パーティションに定期的にブロードキャストされていることがわかります。

1. [partition=0] [key="{\"ts\":415508856908021766,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":2}"] [value="{\"q\":\"CREATE TABLE test.t1(id int primary key, val varchar(16))\",\"t\":3}"] 2. [partition=0] [key="{\"ts\":415508856908021766,\"t\":3}"] [value=] 3. [partition=1] [key="{\"ts\":415508856908021766,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":2}"] [value="{\"q\":\"CREATE TABLE test.t1(id int primary key, val varchar(16))\",\"t\":3}"] 4. [partition=1] [key="{\"ts\":415508856908021766,\"t\":3}"] [value=]

アップストリームで次の SQL ステートメントを実行します。

BEGIN; INSERT INTO test.t1(id, val) VALUES (1, 'aa'); INSERT INTO test.t1(id, val) VALUES (2, 'aa'); UPDATE test.t1 SET val = 'bb' WHERE id = 2; INSERT INTO test.t1(id, val) VALUES (3, 'cc'); COMMIT;
  • 次のログ 5 とログ 6 から、同じテーブルの行変更イベントが主キーに基づいて異なるパーティションに送信される可能性があることがわかりますが、同じ行への変更は同じパーティションに送信されるため、ダウンストリームは簡単に変更できます。イベントを同時に処理します。
  • ログ 6 以降、トランザクション内の同じ行に対する複数の変更は、1 つの行変更イベントでのみ送信されます。
  • ログ 8 はログ 7 の繰り返しイベントです。行変更イベントは繰り返される可能性がありますが、各バージョンの最初のイベントは順番に送信されます。
5. [partition=0] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":1},\"val\":{\"t\":15,\"v\":\"YWE=\"}}}"] 6. [partition=1] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":2},\"val\":{\"t\":15,\"v\":\"YmI=\"}}}"] 7. [partition=0] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":3},\"val\":{\"t\":15,\"v\":\"Y2M=\"}}}"] 8. [partition=0] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":3},\"val\":{\"t\":15,\"v\":\"Y2M=\"}}}"]

アップストリームで次の SQL ステートメントを実行します。

BEGIN; DELETE FROM test.t1 WHERE id = 1; UPDATE test.t1 SET val = 'dd' WHERE id = 3; UPDATE test.t1 SET id = 4, val = 'ee' WHERE id = 2; COMMIT;
  • ログ 9 は、 Deleteタイプの行変更イベントです。このタイプのイベントには、主キー列または一意のインデックス列のみが含まれます。
  • ログ 13 とログ 14 は解決済みイベントです。 Resolved Event は、このパーティションで Resolved TS より小さいイベント (Row Changed Event および DDL Event を含む) が送信されたことを意味します。
9. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"d\":{\"id\":{\"t\":3,\"h\":true,\"v\":1}}}"] 10. [partition=1] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"d\":{\"id\":{\"t\":3,\"h\":true,\"v\":2}}}"] 11. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":3},\"val\":{\"t\":15,\"v\":\"ZGQ=\"}}}"] 12. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":4},\"val\":{\"t\":15,\"v\":\"ZWU=\"}}}"] 13. [partition=0] [key="{\"ts\":415508881038376963,\"t\":3}"] [value=] 14. [partition=1] [key="{\"ts\":415508881038376963,\"t\":3}"] [value=]

コンシューマー向けのプロトコル解析

現在、TiCDC は TiCDC Open Protocol の標準解析ライブラリを提供していませんが、解析例の Golang バージョンと Java バージョンが提供されています。このドキュメントで提供されているデータ形式と次の例を参照して、コンシューマー向けのプロトコル解析を実装できます。

カラムタイプ コード

Column Type Codeは、行変更イベントの列データ型を表します。

タイプコード出力例説明
TINYINT/ブール値1{"t":1,"v":1}
SMALLINT2{"t":2,"v":1}
INT3{"t":3,"v":123}
浮く4{"t":4,"v":153.123}
ダブル5{"t":5,"v":153.123}
ヌル6{"t":6,"v":null}
タイムスタンプ7{"t":7,"v":"1973-12-30 15:30:00"}
BIGINT8{"t":8,"v":123}
ミディアムミント9{"t":9,"v":123}
日にち10/14{"t":10,"v":"2000-01-01"}
時間11{"t":11,"v":"23:59:59"}
日付時刻12{"t":12,"v":"2015-12-20 23:58:58"}
13{"t":13,"v":1970}
VARCHAR/VARBINARY15/253{"t":15,"v":"test"} / {"t":15,"v":"\x89PNG\r\n\x1a\n"}値は UTF-8 でエンコードされます。アップストリーム タイプが VARBINARY の場合、非表示の文字はエスケープされます。
少し16{"t":16,"v":81}
JSON245{"t":245,"v":"{\"key1\": \"value1\"}"}
小数246{"t":246,"v":"129012.1230000"}
列挙型247{"t":247,"v":1}
設定248{"t":248,"v":3}
TINYTEXT/TINYBLOB249{"t":249,"v":"5rWL6K+VdGV4dA=="}値は Base64 でエンコードされます。
MEDIUMTEXT/MEDIUMBLOB250{"t":250,"v":"5rWL6K+VdGV4dA=="}値は Base64 でエンコードされます。
LONGTEXT/LONGBLOB251{"t":251,"v":"5rWL6K+VdGV4dA=="}値は Base64 でエンコードされます。
テキスト/BLOB252{"t":252,"v":"5rWL6K+VdGV4dA=="}値は Base64 でエンコードされます。
文字/バイナリ254{"t":254,"v":"test"} / {"t":254,"v":"\x89PNG\r\n\x1a\n"}値は UTF-8 でエンコードされます。アップストリーム タイプが BINARY の場合、非表示の文字はエスケープされます。
ジオメトリ255サポートされていません

DDL タイプ コード

DDL Type Codeは、DDL イベントの DDL ステートメント タイプを表します。

タイプコード
スキーマの作成1
スキーマを削除2
テーブルの作成3
ドロップ テーブル4
カラムを追加5
カラムをドロップ6
インデックスを追加7
ドロップ インデックス8
外部キーを追加9
外部キーを削除10
テーブルの切り捨て11
カラムの変更12
自動 ID のリベース13
テーブル名の変更14
デフォルト値を設定15
シャード RowID16
テーブル コメントの変更17
インデックスの名前を変更18
テーブル パーティションの追加19
テーブル パーティションの削除20
ビューを作成21
表の文字セットを変更して照合する22
テーブル パーティションの切り捨て23
ビューをドロップ24
テーブルの回復25
スキーマの文字セットを変更して照合する26
テーブルのロック27
テーブルのロックを解除28
修理表29
TiFlash レプリカの設定30
TiFlash レプリカ ステータスの更新31
主キーを追加32
主キーを削除33
シーケンスを作成34
シーケンスの変更35
ドロップ シーケンス36

列のビット フラグ

ビット フラグは、列の特定の属性を表します。

少し価値名前説明
10x01BinaryFlag列がバイナリ エンコードされた列であるかどうか。
20x02ハンドルキーフラグ列がハンドル インデックス列であるかどうか。
30x04GeneratedColumnFlag列が生成列かどうか。
40x08PrimaryKeyFlag列が主キー列であるかどうか。
50x10ユニークキーフラグ列が一意のインデックス列であるかどうか。
60x20複数キーフラグ列が複合インデックス列であるかどうか。
70x40NullableFlag列が null 許容列であるかどうか。
80x80未署名フラグ列が署名されていない列かどうか。

例:

列フラグの値が85の場合、その列は null 許容列、一意のインデックス列、生成された列、およびバイナリ エンコードされた列です。

85 == 0b_101_0101 == NullableFlag | UniqueKeyFlag | GeneratedColumnFlag | BinaryFlag

列の値が46の場合、その列は複合インデックス列、主キー列、生成列、およびハンドル キー列です。

46 == 0b_010_1110 == MultipleKeyFlag | PrimaryKeyFlag | GeneratedColumnFlag | HandleKeyFlag

ノート:

  • BinaryFlagは、列の型が BLOB/TEXT (TINYBLOB/TINYTEXT および BINARY/CHAR を含む) の場合にのみ意味があります。上流の列が BLOB 型の場合、 BinaryFlagの値は1に設定されます。上流列が TEXT タイプの場合、 BinaryFlagの値は0に設定されます。
  • アップストリームからテーブルを複製するために、TiCDC はハンドル インデックスとして有効なインデックスを選択します。ハンドル インデックス列のHandleKeyFlagの値は1に設定されます。