TiCDC Simple Protocol
TiCDC Simple protocol is a row-level data change notification protocol that provides data sources for monitoring, caching, full-text indexing, analysis engines, and primary-secondary replication between heterogeneous databases. This document describes how to use the TiCDC Simple protocol and the data format implementation.
Use the TiCDC Simple protocol
When you use Kafka as the downstream, specify protocol
as "simple"
in the changefeed configuration. Then TiCDC encodes each row change or DDL event as a message, and sends the data change event to the downstream.
The configuration example for using the Simple protocol is as follows:
sink-uri
configuration:
--sink-uri = "kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0"
Changefeed configuration:
[sink]
protocol = "simple"
# The following configuration parameters control the sending behavior of bootstrap messages.
# send-bootstrap-interval-in-sec controls the time interval for sending bootstrap messages, in seconds.
# The default value is 120 seconds, which means that a bootstrap message is sent every 120 seconds for each table.
send-bootstrap-interval-in-sec = 120
# send-bootstrap-in-msg-count controls the message interval for sending bootstrap, in message count.
# The default value is 10000, which means that a bootstrap message is sent every 10000 row changed messages for each table.
send-bootstrap-in-msg-count = 10000
# Note: If you want to disable the sending of bootstrap messages, set both send-bootstrap-interval-in-sec and send-bootstrap-in-msg-count to 0.
# send-bootstrap-to-all-partition controls whether to send bootstrap messages to all partitions.
# The default value is true, which means that bootstrap messages are sent to all partitions of the corresponding table topic.
# Setting it to false means bootstrap messages are sent to only the first partition of the corresponding table topic.
send-bootstrap-to-all-partition = true
[sink.kafka-config.codec-config]
# encoding-format controls the encoding format of the Simple protocol messages. Currently, the Simple protocol message supports "json" and "avro" encoding formats.
# The default value is "json".
encoding-format = "json"
Message types
The TiCDC Simple protocol has the following message types.
DDL:
CREATE
: the creating table event.RENAME
: the renaming table event.CINDEX
: the creating index event.DINDEX
: the deleting index event.ERASE
: the deleting table event.TRUNCATE
: the truncating table event.ALTER
: the altering table event, including adding columns, dropping columns, modifying column types, and otherALTER TABLE
statements supported by TiCDC.QUERY
: other DDL events.
DML:
INSERT
: the inserting event.UPDATE
: the updating event.DELETE
: the deleting event.
Other:
WATERMARK
: containing a TSO (that is, a 64-bit timestamp) of the upstream TiDB cluster, which marks the table replication progress. All events earlier than the watermark have been sent to the downstream.BOOTSTRAP
: containing the schema information of a table, used to build the table schema for the downstream.
Message format
In the Simple protocol, each message contains only one event. The Simple protocol supports encoding messages in JSON and Avro formats. This document uses JSON format as an example. For Avro format messages, their fields and meanings are the same as those in JSON format messages, but the encoding format is different. For details about the Avro format, see Simple Protocol Avro Schema.
DDL
TiCDC encodes a DDL event in the following JSON format:
{
"version":1,
"type":"ALTER",
"sql":"ALTER TABLE `user` ADD COLUMN `createTime` TIMESTAMP",
"commitTs":447987408682614795,
"buildTs":1708936343598,
"tableSchema":{
"schema":"simple",
"table":"user",
"tableID":148,
"version":447987408682614791,
"columns":[
{
"name":"id",
"dataType":{
"mysqlType":"int",
"charset":"binary",
"collate":"binary",
"length":11
},
"nullable":false,
"default":null
},
{
"name":"name",
"dataType":{
"mysqlType":"varchar",
"charset":"utf8mb4",
"collate":"utf8mb4_bin",
"length":255
},
"nullable":true,
"default":null
},
{
"name":"age",
"dataType":{
"mysqlType":"int",
"charset":"binary",
"collate":"binary",
"length":11
},
"nullable":true,
"default":null
},
{
"name":"score",
"dataType":{
"mysqlType":"float",
"charset":"binary",
"collate":"binary",
"length":12
},
"nullable":true,
"default":null
},
{
"name":"createTime",
"dataType":{
"mysqlType":"timestamp",
"charset":"binary",
"collate":"binary",
"length":19
},
"nullable":true,
"default":null
}
],
"indexes":[
{
"name":"primary",
"unique":true,
"primary":true,
"nullable":false,
"columns":[
"id"
]
}
]
},
"preTableSchema":{
"schema":"simple",
"table":"user",
"tableID":148,
"version":447984074911121426,
"columns":[
{
"name":"id",
"dataType":{
"mysqlType":"int",
"charset":"binary",
"collate":"binary",
"length":11
},
"nullable":false,
"default":null
},
{
"name":"name",
"dataType":{
"mysqlType":"varchar",
"charset":"utf8mb4",
"collate":"utf8mb4_bin",
"length":255
},
"nullable":true,
"default":null
},
{
"name":"age",
"dataType":{
"mysqlType":"int",
"charset":"binary",
"collate":"binary",
"length":11
},
"nullable":true,
"default":null
},
{
"name":"score",
"dataType":{
"mysqlType":"float",
"charset":"binary",
"collate":"binary",
"length":12
},
"nullable":true,
"default":null
}
],
"indexes":[
{
"name":"primary",
"unique":true,
"primary":true,
"nullable":false,
"columns":[
"id"
]
}
]
}
}
The fields in the preceding JSON data are explained as follows:
Field | Type | Description |
---|---|---|
version | Number | The version number of the protocol, which is currently 1 . |
type | String | The DDL event type, including CREATE , RENAME , CINDEX , DINDEX , ERASE , TRUNCATE , ALTER , and QUERY . |
sql | String | The DDL statement. |
commitTs | Number | The commit timestamp when the DDL statement execution is completed in the upstream. |
buildTs | Number | The UNIX timestamp when the message is successfully encoded within TiCDC. |
tableSchema | Object | The current schema information of the table. For more information, see TableSchema definition. |
preTableSchema | Object | The schema information of the table before the DDL statement is executed. All DDL events, except the CREATE type of DDL event, have this field. |
DML
INSERT
TiCEC encodes an INSERT
event in the following JSON format:
{
"version":1,
"database":"simple",
"table":"user",
"tableID":148,
"type":"INSERT",
"commitTs":447984084414103554,
"buildTs":1708923662983,
"schemaVersion":447984074911121426,
"data":{
"age":"25",
"id":"1",
"name":"John Doe",
"score":"90.5"
}
}
The fields in the preceding JSON data are explained as follows:
Field | Type | Description |
---|---|---|
version | Number | The version number of the protocol, which is currently 1 . |
database | String | The name of the database. |
table | String | The name of the table. |
tableID | Number | The ID of the table. |
type | String | The DML event type, including INSERT , UPDATE , and DELETE . |
commitTs | Number | The commit timestamp when the DML statement execution is completed in the upstream. |
buildTs | Number | The UNIX timestamp when the message is successfully encoded within TiCDC. |
schemaVersion | Number | The schema version number of the table when the DML message is encoded. |
data | Object | The inserted data, where the field name is the column name and the field value is the column value. |
The INSERT
event contains the data
field, and does not contain the old
field.
UPDATE
TiCDC encodes an UPDATE
event in the following JSON format:
{
"version":1,
"database":"simple",
"table":"user",
"tableID":148,
"type":"UPDATE",
"commitTs":447984099186180098,
"buildTs":1708923719184,
"schemaVersion":447984074911121426,
"data":{
"age":"25",
"id":"1",
"name":"John Doe",
"score":"95"
},
"old":{
"age":"25",
"id":"1",
"name":"John Doe",
"score":"90.5"
}
}
The fields in the preceding JSON data are explained as follows:
Field | Type | Description |
---|---|---|
version | Number | The version number of the protocol, which is currently 1 . |
database | String | The name of the database. |
table | String | The name of the table. |
tableID | Number | The ID of the table. |
type | String | The DML event type, including INSERT , UPDATE , and DELETE . |
commitTs | Number | The commit timestamp when the DML statement execution is completed in the upstream. |
buildTs | Number | The UNIX timestamp when the message is successfully encoded within TiCDC. |
schemaVersion | Number | The schema version number of the table when the DML message is encoded. |
data | Object | The data after updating, where the field name is the column name and the field value is the column value. |
old | Object | The data before updating, where the field name is the column name and the field value is the column value. |
The UPDATE
event contains both the data
and old
fields, which represent the data after and before updating respectively.
DELETE
TiCDC encodes a DELETE
event in the following JSON format:
{
"version":1,
"database":"simple",
"table":"user",
"tableID":148,
"type":"DELETE",
"commitTs":447984114259722243,
"buildTs":1708923776484,
"schemaVersion":447984074911121426,
"old":{
"age":"25",
"id":"1",
"name":"John Doe",
"score":"95"
}
}
The fields in the preceding JSON data are explained as follows:
Field | Type | Description |
---|---|---|
version | Number | The version number of the protocol, which is currently 1 . |
database | String | The name of the database. |
table | String | The name of the table. |
tableID | Number | The ID of the table. |
type | String | The DML event type, including INSERT , UPDATE , and DELETE . |
commitTs | Number | The commit timestamp when the DML statement execution is completed in the upstream. |
buildTs | Number | The UNIX timestamp when the message is successfully encoded within TiCDC. |
schemaVersion | Number | The schema version number of the table when the DML message is encoded. |
old | Object | The deleted data, where the field name is the column name and the field value is the column value. |
The DELETE
event contains the old
field, and does not contain the data
field.
WATERMARK
TiCDC encodes a WATERMARK
event in the following JSON format:
{
"version":1,
"type":"WATERMARK",
"commitTs":447984124732375041,
"buildTs":1708923816911
}
The fields in the preceding JSON data are explained as follows:
Field | Type | Description |
---|---|---|
version | Number | The version number of the protocol, which is currently 1 . |
type | String | The WATERMARK event type. |
commitTs | Number | The commit timestamp of the WATERMARK . |
buildTs | Number | The UNIX timestamp when the message is successfully encoded within TiCDC. |
BOOTSTRAP
TiCDC encodes a BOOTSTRAP
event in the following JSON format:
{
"version":1,
"type":"BOOTSTRAP",
"commitTs":0,
"buildTs":1708924603278,
"tableSchema":{
"schema":"simple",
"table":"new_user",
"tableID":148,
"version":447984074911121426,
"columns":[
{
"name":"id",
"dataType":{
"mysqlType":"int",
"charset":"binary",
"collate":"binary",
"length":11
},
"nullable":false,
"default":null
},
{
"name":"name",
"dataType":{
"mysqlType":"varchar",
"charset":"utf8mb4",
"collate":"utf8mb4_bin",
"length":255
},
"nullable":true,
"default":null
},
{
"name":"age",
"dataType":{
"mysqlType":"int",
"charset":"binary",
"collate":"binary",
"length":11
},
"nullable":true,
"default":null
},
{
"name":"score",
"dataType":{
"mysqlType":"float",
"charset":"binary",
"collate":"binary",
"length":12
},
"nullable":true,
"default":null
}
],
"indexes":[
{
"name":"primary",
"unique":true,
"primary":true,
"nullable":false,
"columns":[
"id"
]
}
]
}
}
The fields in the preceding JSON data are explained as follows:
Field | Type | Description |
---|---|---|
version | Number | The version number of the protocol, which is currently 1 . |
type | String | The BOOTSTRAP event type. |
commitTs | Number | The commitTs of the BOOTSTRAP is 0 . Because it is generated internally by TiCDC, its commitTs is meaningless. |
buildTs | Number | The UNIX timestamp when the message is successfully encoded within TiCDC. |
tableSchema | Object | The schema information of the table. For more information, see TableSchema definition. |
Message generation and sending rules
DDL
- Generation time: TiCDC sends a DDL event after all transactions before this DDL event have been sent.
- Destination: TiCDC sends DDL events to all partitions of the corresponding topic.
DML
- Generation time: TiCDC sends DML events in the order of the
commitTs
of the transaction. - Destination: TiCDC sends DDL events to the corresponding partition of the corresponding topic according to the user-configured dispatch rules.
WATERMARK
- Generation time: TiCDC sends
WATERMARK
events periodically to mark the replication progress of a changefeed. The current interval is 1 second. - Destination: TiCDC sends
WATERMARK
events to all partitions of the corresponding topic.
BOOTSTRAP
- Generation time:
- After creating a new changefeed, before the first DML event of a table is sent, TiCDC sends a
BOOTSTRAP
event to the downstream to build the table schema. - Additionally, TiCDC sends
BOOTSTRAP
events periodically to allow newly joined consumers to build the table schema. The default interval is 120 seconds or every 10000 messages. You can adjust the sending interval by configuring thesend-bootstrap-interval-in-sec
andsend-bootstrap-in-msg-count
parameters in thesink
configuration. - If a table does not receive any new DML messages within 30 minutes, the table is considered inactive. TiCDC stops sending
BOOTSTRAP
events for the table until new DML events are received.
- After creating a new changefeed, before the first DML event of a table is sent, TiCDC sends a
- Destination: By default, TiCDC sends
BOOTSTRAP
events to all partitions of the corresponding topic. You can adjust the sending strategy by configuring thesend-bootstrap-to-all-partition
parameter in the sink configuration.
Message consumption methods
Because the TiCDC Simple protocol does not include the schema information of the table when sending a DML message, the downstream needs to receive the DDL or BOOTSTRAP message and cache the schema information of the table before consuming a DML message. When receiving a DML message, the downstream obtains the corresponding table schema information from the cache by searching the table
name and schemaVersion
fields of the DML message, and then correctly consumes the DML message.
The following describes how the downstream consumes DML messages based on DDL or BOOTSTRAP messages. According to preceding descriptions, the following information is known:
- Each DML message contains a
schemaVersion
field to mark the schema version number of the table corresponding to the DML message. - Each DDL message contains a
tableSchema
andpreTableSchema
field to mark the schema information of the table before and after the DDL event. - Each BOOTSTRAP message contains a
tableSchema
field to mark the schema information of the table corresponding to the BOOTSTRAP message.
The consumption methods are introduced in the following two scenarios.
Scenario 1: The consumer starts consuming from the beginning
In this scenario, the consumer starts consuming from the creation of a table, so the consumer can receive all DDL and BOOTSTRAP messages of the table. In this case, the consumer can obtain the schema information of the table through the table
name and schemaVersion
field of the DML message. The detailed process is as follows:
Scenario 2: The consumer starts consuming from the middle
When a new consumer joins the consumer group, it might start consuming from the middle, so it might miss earlier DDL and BOOTSTRAP messages of the table. In this case, the consumer might receive some DML messages before obtaining the schema information of the table. Therefore, the consumer needs to wait for a period of time until it receives the DDL or BOOTSTRAP message to obtain the schema information of the table. Because TiCDC sends BOOTSTRAP messages periodically, the consumer can always obtain the schema information of the table within a period of time. The detailed process is as follows:
Reference
TableSchema definition
TableSchema is a JSON object that contains the schema information of the table, including the table name, table ID, table version number, column information, and index information. The JSON message format is as follows:
{
"schema":"simple",
"table":"user",
"tableID":148,
"version":447984074911121426,
"columns":[
{
"name":"id",
"dataType":{
"mysqlType":"int",
"charset":"binary",
"collate":"binary",
"length":11
},
"nullable":false,
"default":null
},
{
"name":"name",
"dataType":{
"mysqlType":"varchar",
"charset":"utf8mb4",
"collate":"utf8mb4_bin",
"length":255
},
"nullable":true,
"default":null
},
{
"name":"age",
"dataType":{
"mysqlType":"int",
"charset":"binary",
"collate":"binary",
"length":11
},
"nullable":true,
"default":null
},
{
"name":"score",
"dataType":{
"mysqlType":"float",
"charset":"binary",
"collate":"binary",
"length":12
},
"nullable":true,
"default":null
}
],
"indexes":[
{
"name":"primary",
"unique":true,
"primary":true,
"nullable":false,
"columns":[
"id"
]
}
]
}
The preceding JSON data is explained as follows:
Field | Type | Description |
---|---|---|
schema | String | The name of the database. |
table | String | The name of the table. |
tableID | Number | The ID of the table. |
version | Number | The schema version number of the table. |
columns | Array | The column information, including the column name, data type, whether it can be null, and the default value. |
indexes | Array | The index information, including the index name, whether it is unique, whether it is a primary key, and the index columns. |
You can uniquely identify the schema information of a table by the table name and the schema version number.
Column definition
Column is a JSON object that contains the schema information of the column, including the column name, data type, whether it can be null, and the default value.
{
"name":"id",
"dataType":{
"mysqlType":"int",
"charset":"binary",
"collate":"binary",
"length":11
},
"nullable":false,
"default":null
}
The preceding JSON data is explained as follows:
Field | Type | Description |
---|---|---|
name | String | The name of the column. |
dataType | Object | The data type information, including the MySQL data type, character set, collation, and field length. |
nullable | Boolean | Whether the column can be null. |
default | String | The default value of the column. |
Index definition
Index is a JSON object that contains the schema information of the index, including the index name, whether it is unique, whether it is a primary key, and the index column.
{
"name":"primary",
"unique":true,
"primary":true,
"nullable":false,
"columns":[
"id"
]
}
The preceding JSON data is explained as follows:
Field | Type | Description |
---|---|---|
name | String | The name of the index. |
unique | Boolean | Whether the index is unique. |
primary | Boolean | Whether the index is a primary key. |
nullable | Boolean | Whether the index can be null. |
columns | Array | The column names included in the index. |
mysqlType reference table
The following table describes the value range of the mysqlType
field in the TiCDC Simple protocol and its type in TiDB (Golang) and Avro (Java). When you need to parse DML messages, you can correctly parse the data according to this table and the mysqlType
field in the DML message, depending on the protocol and language you use.
TiDB type (Golang) represents the type of the corresponding mysqlType
when it is processed in TiDB and TiCDC (Golang). Avro type (Java) represents the type of the corresponding mysqlType
when it is encoded into Avro format messages.
mysqlType | Value range | TiDB type (Golang) | Avro type (Java) |
---|---|---|---|
tinyint | [-128, 127] | int64 | long |
tinyint unsigned | [0, 255] | uint64 | long |
smallint | [-32768, 32767] | int64 | long |
smallint unsigned | [0, 65535] | uint64 | long |
mediumint | [-8388608, 8388607] | int64 | long |
mediumint unsigned | [0, 16777215] | uint64 | long |
int | [-2147483648, 2147483647] | int64 | long |
int unsigned | [0, 4294967295] | uint64 | long |
bigint | [-9223372036854775808, 9223372036854775807] | int64 | long |
bigint unsigned | [0, 9223372036854775807] | uint64 | long |
bigint unsigned | [9223372036854775808, 18446744073709551615] | uint64 | string |
float | / | float32 | float |
double | / | float64 | double |
decimal | / | string | string |
varchar | / | []uint8 | string |
char | / | []uint8 | string |
varbinary | / | []uint8 | bytes |
binary | / | []uint8 | bytes |
tinytext | / | []uint8 | string |
text | / | []uint8 | string |
mediumtext | / | []uint8 | string |
longtext | / | []uint8 | string |
tinyblob | / | []uint8 | bytes |
blob | / | []uint8 | bytes |
mediumblob | / | []uint8 | bytes |
longblob | / | []uint8 | bytes |
date | / | string | string |
datetime | / | string | string |
timestamp | / | string | string |
time | / | string | string |
year | / | int64 | long |
enum | / | uint64 | long |
set | / | uint64 | long |
bit | / | uint64 | long |
json | / | string | string |
bool | / | int64 | long |
Avro schema definition
The Simple protocol supports outputting messages in Avro format. For details about the Avro format, see Simple Protocol Avro Schema.