Replicate Data to Storage Services
Starting from TiDB v6.5.0, TiCDC supports saving row change events to storage services, including Amazon S3, GCS, Azure Blob Storage, and NFS. This document describes how to create a changefeed that replicates incremental data to such storage services using TiCDC, and how data is stored. The organization of this document is as follows:
Replicate change data to storage services
Run the following command to create a changefeed task:
cdc cli changefeed create \
--server=http://10.0.10.25:8300 \
--sink-uri="s3://logbucket/storage_test?protocol=canal-json" \
--changefeed-id="simple-replication-task"
The output is as follows:
Info: {"upstream_id":7171388873935111376,"namespace":"default","id":"simple-replication-task","sink_uri":"s3://logbucket/storage_test?protocol=canal-json","create_time":"2022-11-29T18:52:05.566016967+08:00","start_ts":437706850431664129,"engine":"unified","config":{"case_sensitive":true,"enable_old_value":true,"force_replicate":false,"ignore_ineligible_table":false,"check_gc_safe_point":true,"enable_sync_point":false,"sync_point_interval":600000000000,"sync_point_retention":86400000000000,"filter":{"rules":["*.*"],"event_filters":null},"mounter":{"worker_num":16},"sink":{"protocol":"canal-json","schema_registry":"","csv":{"delimiter":",","quote":"\"","null":"\\N","include_commit_ts":false},"column_selectors":null,"transaction_atomicity":"none","encoder_concurrency":16,"terminator":"\r\n","date_separator":"none","enable_partition_separator":false},"consistent":{"level":"none","max_log_size":64,"flush_interval":2000,"storage":""}},"state":"normal","creator_version":"v6.5.0-master-dirty"}
--server
: The address of any TiCDC server in the TiCDC cluster.--changefeed-id
: The ID of the changefeed. The format must match the^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$
regular expression. If this ID is not specified, TiCDC automatically generates a UUID (the version 4 format) as the ID.--sink-uri
: The downstream address of the changefeed. For details, see Configure sink URI.--start-ts
: The starting TSO of the changefeed. TiCDC starts pulling data from this TSO. The default value is the current time.--target-ts
: The ending TSO of the changefeed. TiCDC stops pulling data until this TSO. The default value is empty, which means that TiCDC does not automatically stop pulling data.--config
: The configuration file of the changefeed. For details, see TiCDC changefeed configuration parameters.
Configure sink URI
This section describes how to configure sink URI for storage services, including Amazon S3, GCS, Azure Blob Storage, and NFS. Sink URI is used to specify the connection information of the TiCDC target system. The format is as follows:
[scheme]://[host]/[path]?[query_parameters]
For [query_parameters]
in the URI, the following parameters can be configured:
Parameter | Description | Default value | Value range |
---|---|---|---|
worker-count | Concurrency for saving data changes to cloud storage in the downstream. | 16 | [1, 512] |
flush-interval | Interval for saving data changes to cloud storage in the downstream. | 5s | [2s, 10m] |
file-size | A data change file is stored to cloud storage if the number of bytes exceeds the value of this parameter. | 67108864 | [1048576, 536870912] |
protocol | The protocol format of the messages sent to the downstream. | N/A | canal-json and csv |
enable-tidb-extension | When protocol is set to canal-json and enable-tidb-extension is set to true , TiCDC sends WATERMARK events and adds the TiDB extension field to Canal-JSON messages. | false | false and true |
Configure sink URI for external storage
The following is an example configuration for Amazon S3:
--sink-uri="s3://bucket/prefix?protocol=canal-json"
The following is an example configuration for GCS:
--sink-uri="gcs://bucket/prefix?protocol=canal-json"
The following is an example configuration for Azure Blob Storage:
--sink-uri="azure://bucket/prefix?protocol=canal-json"
Configure sink URI for NFS
The following is an example configuration for NFS:
--sink-uri="file:///my-directory/prefix?protocol=canal-json"
Storage path structure
This section describes the storage path structure of data change records, metadata, and DDL events.
Data change records
Data change records are saved to the following path:
{scheme}://{prefix}/{schema}/{table}/{table-version-separator}/{partition-separator}/{date-separator}/CDC{num}.{extension}
scheme
: specifies the storage type, for example,s3
,gcs
,azure
, orfile
.prefix
: specifies the user-defined parent directory, for example,s3://bucket/bbb/ccc
.schema
: specifies the schema name, for example,s3://bucket/bbb/ccc/test
.table
: specifies the table name, for example,s3://bucket/bbb/ccc/test/table1
.table-version-separator
: specifies the separator that separates the path by the table version, for example,s3://bucket/bbb/ccc/test/table1/9999
.partition-separator
: specifies the separator that separates the path by the table partition, for example,s3://bucket/bbb/ccc/test/table1/9999/20
.date-separator
: classifies the files by the transaction commit date. Value options are:none
: nodate-separator
. For example, all files withtest.table1
version being9999
are saved tos3://bucket/bbb/ccc/test/table1/9999
.year
: the separator is the year of the transaction commit date, for example,s3://bucket/bbb/ccc/test/table1/9999/2022
.month
: the separator is the year and month of the transaction commit date, for example,s3://bucket/bbb/ccc/test/table1/9999/2022-01
.day
: the separator is the year, month, and day of the transaction commit date, for example,s3://bucket/bbb/ccc/test/table1/9999/2022-01-02
.
num
: saves the serial number of the file that records the data change, for example,s3://bucket/bbb/ccc/test/table1/9999/2022-01-02/CDC000005.csv
.extension
: specifies the extension of the file. TiDB v6.5.0 supports the CSV and Canal-JSON formats.
Index files
An index file is used to prevent written data from being overwritten by mistake. It is stored in the same path as the data change records.
{scheme}://{prefix}/{schema}/{table}/{table-version-separator}/{partition-separator}/{date-separator}/CDC.index
The index file records the largest file name used in the current directory. For example:
CDC000005.csv
In this example, the files CDC000001.csv
through CDC000004.csv
in this directory are occupied. When a table scheduling or node restart occurs in the TiCDC cluster, the new node reads the index file and determines if CDC000005.csv
is occupied. If it is not occupied, the new node writes the file starting from CDC000005.csv
. If it is occupied, it starts writing from CDC000006.csv
, which prevents overwriting data written by other nodes.
Metadata
Metadata is saved in the following path:
{protocol}://{prefix}/metadata
Metadata is a JSON-formatted file, for example:
{
"checkpoint-ts":433305438660591626
}
checkpoint-ts
: Transactions withcommit-ts
smaller thancheckpoint-ts
are written to the target storage in the downstream.
DDL events
When DDL events cause the table version to change, TiCDC switches to a new path to write data change records. For example, when the version of test.table1
changes from 9999
to 10000
, data will be written to the path s3://bucket/bbb/ccc/test/table1/10000/2022-01-02/CDC000001.csv
. In addition, when DDL events occur, TiCDC generates a schema.json
file to save the table schema information.
Table schema information is saved in the following path:
{scheme}://{prefix}/{schema}/{table}/{table-version-separator}/schema.json
The following is a schema.json
file:
{
"Table":"table1",
"Schema":"test",
"Version":1,
"TableVersion":10000,
"Query": "ALTER TABLE test.table1 ADD OfficeLocation blob(20)",
"TableColumns":[
{
"ColumnName":"Id",
"ColumnType":"INT",
"ColumnNullable":"false",
"ColumnIsPk":"true"
},
{
"ColumnName":"LastName",
"ColumnType":"CHAR",
"ColumnLength":"20"
},
{
"ColumnName":"FirstName",
"ColumnType":"VARCHAR",
"ColumnLength":"30"
},
{
"ColumnName":"HireDate",
"ColumnType":"DATETIME"
},
{
"ColumnName":"OfficeLocation",
"ColumnType":"BLOB",
"ColumnLength":"20"
}
],
"TableColumnsTotal":"5"
}
Table
: Table name.Schema
: Schema name.Version
: Protocol version of the storage sink.TableVersion
: Table version.Query
:DDL statement.TableColumns
: An array of one or more maps, each of which describes a column in the source table.ColumnName
: Column name.ColumnType
: Column type. For details, see Data type.ColumnLength
: Column length. For details, see Data type.ColumnPrecision
: Column precision. For details, see Data type.ColumnScale
: The number of digits following the decimal point (the scale). For details, see Data type.ColumnNullable
: The column can be NULL when the value of this option istrue
.ColumnIsPk
: The column is part of the primary key when the value of this option istrue
.
TableColumnsTotal
: The size of theTableColumns
array.
Data type
This section describes the data types used in the schema.json
file. The data types are defined as T(M[, D])
. For details, see Data Types.
Integer types
Integer types in TiDB are defined as IT[(M)] [UNSIGNED]
, where
IT
is the integer type, which can beTINYINT
,SMALLINT
,MEDIUMINT
,INT
,BIGINT
, orBIT
.M
is the display width of the type.
Integer types are defined as follows in schema.json
:
{
"ColumnName":"COL1",
"ColumnType":"{IT} [UNSIGNED]",
"ColumnPrecision":"{M}"
}
Decimal types
Decimal types in TiDB are defined as DT[(M,D)][UNSIGNED]
, where
DT
is the floating-point type, which can beFLOAT
,DOUBLE
,DECIMAL
, orNUMERIC
.M
is the precision of the data type, or the total number of digits.D
is the number of digits following the decimal point.
Decimal types are defined as follows in schema.json
:
{
"ColumnName":"COL1",
"ColumnType":"{DT} [UNSIGNED]",
"ColumnPrecision":"{M}",
"ColumnScale":"{D}"
}
Date and time types
Date types in TiDB are defined as DT
, where
DT
is the date type, which can beDATE
orYEAR
.
The date types are defined as follows in schema.json
:
{
"ColumnName":"COL1",
"ColumnType":"{DT}"
}
The time types in TiDB are defined as TT[(M)]
, where
TT
is the time type, which can beTIME
,DATETIME
, orTIMESTAMP
.M
is the precision of seconds in the range from 0 to 6.
The time types are defined as follows in schema.json
:
{
"ColumnName":"COL1",
"ColumnType":"{TT}",
"ColumnScale":"{M}"
}
String types
The string types in TiDB are defined as ST[(M)]
, where
ST
is the string type, which can beCHAR
,VARCHAR
,TEXT
,BINARY
,BLOB
, orJSON
.M
is the maximum length of the string.
The string types are defined as follows in schema.json
:
{
"ColumnName":"COL1",
"ColumnType":"{ST}",
"ColumnLength":"{M}"
}
Enum and Set types
The Enum and Set types are defined as follows in schema.json
:
{
"ColumnName":"COL1",
"ColumnType":"{ENUM/SET}",
}