TiCDC Changefeed 命令行参数和配置参数

TiCDC Changefeed 命令行参数

本章节将以创建同步任务为例,介绍 TiCDC Changefeed 的命令行参数:

cdc cli changefeed create --server=http://10.0.10.25:8300 --sink-uri="mysql://root:123456@127.0.0.1:3306/" --changefeed-id="simple-replication-task"
Create changefeed successfully! ID: simple-replication-task Info: {"upstream_id":7178706266519722477,"namespace":"default","id":"simple-replication-task","sink_uri":"mysql://root:xxxxx@127.0.0.1:4000/?time-zone=","create_time":"2022-12-19T15:05:46.679218+08:00","start_ts":438156275634929669,"engine":"unified","config":{"case_sensitive":true,"enable_old_value":true,"force_replicate":false,"ignore_ineligible_table":false,"check_gc_safe_point":true,"enable_sync_point":true,"bdr_mode":false,"sync_point_interval":30000000000,"sync_point_retention":3600000000000,"filter":{"rules":["test.*"],"event_filters":null},"mounter":{"worker_num":16},"sink":{"protocol":"","schema_registry":"","csv":{"delimiter":",","quote":"\"","null":"\\N","include_commit_ts":false},"column_selectors":null,"transaction_atomicity":"none","encoder_concurrency":16,"terminator":"\r\n","date_separator":"none","enable_partition_separator":false},"consistent":{"level":"none","max_log_size":64,"flush_interval":2000,"storage":""}},"state":"normal","creator_version":"v6.5.0"}
  • --changefeed-id:同步任务的 ID,格式需要符合正则表达式 ^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$。如果不指定该 ID,TiCDC 会自动生成一个 UUID(version 4 格式)作为 ID。

  • --sink-uri:同步任务下游的地址,需要按照以下格式进行配置,目前 scheme 支持 mysqltidbkafka

    [scheme]://[userinfo@][host]:[port][/path]?[query_parameters]

    URI 中包含特殊字符时,如 ! * ' ( ) ; : @ & = + $ , / ? % # [ ],需要对 URI 特殊字符进行转义处理。你可以在 URI Encoder 中对 URI 进行转义。

  • --start-ts:指定 changefeed 的开始 TSO。TiCDC 集群将从这个 TSO 开始拉取数据。默认为当前时间。

  • --target-ts:指定 changefeed 的目标 TSO。TiCDC 集群拉取数据直到这个 TSO 停止。默认为空,即 TiCDC 不会自动停止。

  • --config:指定 changefeed 配置文件。

TiCDC Changefeed 配置文件说明

本章节详细介绍了同步任务的配置。

# 指定该 Changefeed 在 Capture Server 中内存配额的上限。对于超额使用部分, # 会在运行中被 Go runtime 优先回收。默认值为 `1073741824`,即 1 GB。 # memory-quota = 1073741824 # 指定配置文件中涉及的库名、表名是否为大小写敏感 # 该配置会同时影响 filter 和 sink 相关配置,默认为 true case-sensitive = true # 是否开启 Syncpoint 功能,从 v6.3.0 开始支持,该功能默认关闭。 # 从 v6.4.0 开始,使用 Syncpoint 功能需要同步任务拥有下游集群的 SYSTEM_VARIABLES_ADMIN 或者 SUPER 权限。 # 注意:该参数只有当下游为 TiDB 时,才会生效。 # enable-sync-point = false # Syncpoint 功能对齐上下游 snapshot 的时间间隔 # 配置格式为 h m s,例如 "1h30m30s" # 默认值为 10m,最小值为 30s # 注意:该参数只有当下游为 TiDB 时,才会生效。 # sync-point-interval = "5m" # Syncpoint 功能在下游表中保存的数据的时长,超过这个时间的数据会被清理 # 配置格式为 h m s,例如 "24h30m30s" # 默认值为 24h # 注意:该参数只有当下游为 TiDB 时,才会生效。 # sync-point-retention = "1h" [mounter] # mounter 解码 KV 数据的线程数,默认值为 16 # worker-num = 16 [filter] # 忽略指定 start_ts 的事务 # ignore-txn-start-ts = [1, 2] # 过滤器规则 # 过滤规则语法:https://docs.pingcap.com/zh/tidb/stable/table-filter#表库过滤语法 rules = ['*.*', '!test.*'] # 事件过滤器规则 # 事件过滤器的详细配置规则可参考:https://docs.pingcap.com/zh/tidb/stable/ticdc-filter # 第一个事件过滤器规则 # [[filter.event-filters]] # matcher = ["test.worker"] # matcher 是一个白名单,表示该过滤规则只应用于 test 库中的 worker 表 # ignore-event = ["insert"] # 过滤掉 insert 事件 # ignore-sql = ["^drop", "add column"] # 过滤掉以 "drop" 开头或者包含 "add column" 的 DDL # ignore-delete-value-expr = "name = 'john'" # 过滤掉包含 name = 'john' 条件的 delete DML # ignore-insert-value-expr = "id >= 100" # 过滤掉包含 id >= 100 条件的 insert DML # ignore-update-old-value-expr = "age < 18" # 过滤掉旧值 age < 18 的 update DML # ignore-update-new-value-expr = "gender = 'male'" # 过滤掉新值 gender = 'male' 的 update DML # 第二个事件过滤器规则 # [[filter.event-filters]] # matcher = ["test.fruit"] # 该事件过滤器只应用于 test.fruit 表 # ignore-event = ["drop table", "delete"] # 忽略 drop table 的 DDL 事件和 delete 类型的 DML 事件 # ignore-sql = ["^drop table", "alter table"] # 忽略以 drop table 开头的,或者包含 alter table 的 DDL 语句 # ignore-insert-value-expr = "price > 1000 and origin = 'no where'" # 忽略包含 price > 1000 和 origin = 'no where' 条件的 insert DML [scheduler] # 将表以 Region 为单位分配给多个 TiCDC 节点进行同步。 # 注意:该功能只在 Kafka changefeed 上生效,暂不支持 MySQL changefeed。 # 默认为 "false"。设置为 "true" 以打开该功能。 enable-table-across-nodes = false # enable-table-across-nodes 开启后,有两种分配模式 # 1. 按 Region 的数量分配,即每个 CDC 节点处理 region 的个数基本相等。当某个表 Region 个数大于 `region-threshold` 值时,会将表分配到多个节点处理。`region-threshold` 默认值为 10000。 # region-threshold = 10000 # 2. 按写入的流量分配,即每个 CDC 节点处理 region 总修改行数基本相当。只有当表中每分钟修改行数超过 `write-key-threshold` 值时,该表才会生效。 # write-key-threshold = 30000 # 注意: # `write-key-threshold` 参数默认值为 0,代表默认不会采用流量的分配模式。 # 两种方式配置一种即可生效,当 `region-threshold` 和 `write-key-threshold` 同时配置时,TiCDC 将优先采用按流量分配的模式,即 `write-key-threshold`。 [sink] ############ 以下是 MQ 类型 sink 配置 ############ # 对于 MQ 类的 Sink,可以通过 dispatchers 配置 event 分发器 # 支持 partition 及 topic(从 v6.1 开始支持)两种 event 分发器。二者的详细说明见下一节。 # matcher 的匹配语法和过滤器规则语法相同,matcher 匹配规则的详细说明见下一节。 # 注意:该参数只有当下游为消息队列时,才会生效。 # 注意:当下游 MQ 为 Pulsar 时,如果 partition 的路由规则未指定为 'ts', 'index-value', 'table', 'default' 中的任意一个,那么将会使用你设置的字符串作为每一条 Pulsar message 的 key 进行路由。例如,如果你指定的路由规则为 'code' 字符串,那么符合该 matcher 的所有 Pulsar message 都将会以 'code' 作为 key 进行路由。 # dispatchers = [ # {matcher = ['test1.*', 'test2.*'], topic = "Topic 表达式 1", partition = "ts" }, # {matcher = ['test3.*', 'test4.*'], topic = "Topic 表达式 2", partition = "index-value" }, # {matcher = ['test1.*', 'test5.*'], topic = "Topic 表达式 3", partition = "table"}, # {matcher = ['test6.*'], partition = "ts"} # ] # protocol 用于指定编码消息时使用的格式协议 # 当下游类型是 Kafka 时,支持 canal-json、avro 和 open-protocol。 # 当下游类型是 Pulsar 时,仅支持 canal-json 协议。 # 当下游类型是存储服务时,目前仅支持 canal-json、csv 两种协议。 # 注意:该参数只有当下游为 Kafka、Pulsar,或存储服务时,才会生效。 # protocol = "canal-json" # delete-only-output-handle-key-columns 用于指定 Delete 事件的输出内容,只对 canal-json 和 open-protocol 协议有效。从 v7.2.0 开始引入。 # 该参数和 `force-replicate` 参数不兼容,如果同时将该参数和 `force-replicate` 设置为 true,创建 changefeed 会报错。 # 默认值为 false,即输出所有列的内容。当设置为 true 时,只输出主键列,或唯一索引列的内容。 # Avro 协议不受该参数控制,总是只输出主键列,或唯一索引列的内容。 # CSV 协议不受该参数控制,总是输出所有列的内容。 delete-only-output-handle-key-columns = false # Schema 注册表的 URL。 # 注意:该参数只有当下游为消息队列时,才会生效。 # schema-registry = "http://localhost:80801/subjects/{subject-name}/versions/{version-number}/schema" # 编码数据时所用编码器的线程数。 # 默认值为 32。 # 注意:该参数只有当下游为消息队列时,才会生效。 # encoder-concurrency = 32 # 是否开启 Kafka Sink V2。Kafka Sink V2 内部使用 kafka-go 实现。 # 默认值为 false。 # 注意:该参数只有当下游为消息队列时,才会生效。 # enable-kafka-sink-v2 = false # 是否只向下游同步有内容更新的列。从 v7.1.0 开始支持。 # 默认值为 false。 # 注意:该参数只有当下游为消息队列,并且使用 Open Protocol 或 Canal-JSON 时,才会生效。 # only-output-updated-columns = false ############ 以下是存储服务类型 sink 配置 ############ # 以下三个配置项仅在同步到存储服务的 sink 中使用,在 MQ 和 MySQL 类 sink 中无需设置。 # 换行符,用来分隔两个数据变更事件。默认值为空,表示使用 "\r\n" 作为换行符。 # terminator = '' # 文件路径的日期分隔类型。可选类型有 `none`、`year`、`month` 和 `day`。默认值为 `day`,即按天分隔。详见 <https://docs.pingcap.com/zh/tidb/stable/ticdc-sink-to-cloud-storage#数据变更记录>。 # 注意:该参数只有当下游为存储服务时,才会生效。 date-separator = 'day' # 是否使用 partition 作为分隔字符串。默认值为 true,即一张表中各个 partition 的数据会分不同的目录来存储。建议保持该配置项为 true 以避免下游分区表可能丢数据的问题 <https://github.com/pingcap/tiflow/issues/8581>。使用示例详见 <https://docs.pingcap.com/zh/tidb/dev/ticdc-sink-to-cloud-storage#数据变更记录>。 # 注意:该参数只有当下游为存储服务时,才会生效。 enable-partition-separator = true # 从 v6.5.0 开始,TiCDC 支持以 CSV 格式将数据变更记录保存至存储服务中,在 MQ 和 MySQL 类 sink 中无需设置。 # [sink.csv] # 字段之间的分隔符。必须为 ASCII 字符,默认值为 `,`。 # delimiter = ',' # 用于包裹字段的引号字符。空值代表不使用引号字符。默认值为 `"`。 # quote = '"' # CSV 中列为 NULL 时将以什么字符来表示。默认值为 `\N`。 # null = '\N' # 是否在 CSV 行中包含 commit-ts。默认值为 false。 # include-commit-ts = false # 二进制类型数据的编码方式,可选 'base64' 或 'hex'。默认值为 'base64'。 # binary-encoding-method = 'base64' # consistent 中的字段用于配置 Changefeed 的数据一致性。详细的信息,请参考 <https://docs.pingcap.com/tidb/stable/ticdc-sink-to-mysql#eventually-consistent-replication-in-disaster-scenarios>。 # 注意:一致性相关参数只有当下游为数据库并且开启 redo log 功能时,才会生效。 [consistent] # 数据一致性级别。默认值为 "none",可选值为 "none" 和 "eventual"。 # 设置为 "none" 时将关闭 redo log。 level = "none" # redo log 的最大日志大小,单位为 MB。默认值为 64。 max-log-size = 64 # 两次 redo log 刷新的时间间隔,单位为毫秒。默认值为 2000。 flush-interval = 2000 # redo log 使用存储服务的 URI。默认值为空。 storage = "" # 是否将 redo log 存储到文件中。默认值为 false。 use-file-backend = false [integrity] # 是否开启单行数据的 Checksum 校验功能,默认值为 "none",即不开启。可选值为 "none" 和 "correctness"。 integrity-check-level = "none" # 当单行数据的 Checksum 校验失败时,Changefeed 打印错误行数据相关日志的级别。默认值为 "warn",可选值为 "warn" 和 "error"。 corruption-handle-level = "warn" # 以下参数仅在下游为 Kafka 时生效。 [sink.kafka-config] # Kafka SASL 认证机制。该参数默认值为空,表示不使用 SASL 认证。 sasl-mechanism = "OAUTHBEARER" # Kafka SASL OAUTHBEARER 认证机制中的 client-id。默认值为空。在使用该认证机制时,该参数必填。 sasl-oauth-client-id = "producer-kafka" # Kafka SASL OAUTHBEARER 认证机制中的 client-secret。默认值为空。需要 Base64 编码。在使用该认证机制时,该参数必填。 sasl-oauth-client-secret = "cHJvZHVjZXIta2Fma2E=" # Kafka SASL OAUTHBEARER 认证机制中的 token-url 用于获取 token。默认值为空。在使用该认证机制时,该参数必填。 sasl-oauth-token-url = "http://127.0.0.1:4444/oauth2/token" # Kafka SASL OAUTHBEARER 认证机制中的 scopes。默认值为空。在使用该认证机制时,该参数可选填。 sasl-oauth-scopes = ["producer.kafka", "consumer.kafka"] # Kafka SASL OAUTHBEARER 认证机制中的 grant-type。默认值为 "client_credentials"。在使用该认证机制时,该参数可选填。 sasl-oauth-grant-type = "client_credentials" # Kafka SASL OAUTHBEARER 认证机制中的 audience。默认值为空。在使用该认证机制时,该参数可选填。 sasl-oauth-audience="kafka" # 以下配置仅在选用 avro 作为协议,并且使用 AWS Glue Schema Registry 时需要配置 # 请参考 "同步数据到 Kafka" 这一文档中 "使用 AWS Glue Schema Registry" 这一节内容:https://docs.pingcap.com/zh/tidb/dev/ticdc-sink-to-kafka#ticdc-集成-aws-glue-schema-registry # [sink.kafka-config.glue-schema-registry-config] # region="us-west-1" # registry-name="ticdc-test" # access-key="xxxx" # secret-access-key="xxxx" # token="xxxx" # 以下参数仅在下游为 Pulsar 时生效。 [sink.pulsar-config] # 使用 token 进行 Pulsar 服务端的认证,此处为 token 的值。 authentication-token = "xxxxxxxxxxxxx" # 指定使用 token 进行 Pulsar 服务端的认证,此处为 token 所在文件的路径。 token-from-file="/data/pulsar/token-file.txt" # Pulsar 使用 basic 账号密码验证身份。 basic-user-name="root" # Pulsar 使用 basic 账号密码验证身份,此处为密码。 basic-password="password" # Pulsar TLS 加密认证证书路径。 auth-tls-certificate-path="/data/pulsar/certificate" # Pulsar TLS 加密认证私钥路径。 auth-tls-private-key-path="/data/pulsar/certificate.key" # Pulsar TLS 加密可信证书文件路径。 tls-trust-certs-file-path="/data/pulsar/tls-trust-certs-file" # Pulsar oauth2 issuer-url 更多详细配置请看 Pulsar 官方介绍:https://pulsar.apache.org/docs/2.10.x/client-libraries-go/#tls-encryption-and-authentication oauth2.oauth2-issuer-url="https://xxxx.auth0.com" # Pulsar oauth2 audience oauth2.oauth2-audience="https://xxxx.auth0.com/api/v2/" # Pulsar oauth2 private-key oauth2.oauth2-private-key="/data/pulsar/privateKey" # Pulsar oauth2 client-id oauth2.oauth2-client-id="0Xx...Yyxeny" # Pulsar oauth2 oauth2-scope oauth2.oauth2-scope="xxxx" # TiCDC 中缓存 Pulsar Producer 的个数,默认上限为 10240 个。每个 Pulsar Producer 对应一个 topic,如果你需要同步的 topic 数量大于默认值,则需要调大该数量。 pulsar-producer-cache-size=10240 # Pulsar 数据压缩方式,默认不压缩,可选 "lz4"、"zlib"、"zstd"。 compression-type="" # Pulsar 客户端与服务端建立 TCP 连接的超时时间,默认 5 秒。 connection-timeout=5 # Pulsar 客户端发起创建、订阅等操作的超时时间,默认为 30 秒。 operation-timeout=30 # Pulsar Producer 发送消息时的单个 batch 内的消息数量上限,默认值为 1000。 batching-max-messages=1000 # Pulsar Producer 消息攒批的时间间隔,默认 10 毫秒。 batching-max-publish-delay=10 # Pulsar Producer 发送消息的超时时间,默认 30 秒。 send-timeout=30