TiCDC OpenAPI v2
TiCDC 提供 OpenAPI 功能,你可以通过 OpenAPI v2 对 TiCDC 集群进行查询和运维操作。OpenAPI 的功能是 cdc cli 工具的一个子集。
你可以通过 OpenAPI 完成 TiCDC 集群的如下运维操作:
- 获取 TiCDC 节点状态信息
- 检查 TiCDC 集群的健康状态
- 创建同步任务
- 删除同步任务
- 更新同步任务配置
- 查询同步任务列表
- 查询特定同步任务
- 暂停同步任务
- 恢复同步任务
- 查询同步子任务列表
- 查询特定同步子任务
- 查询 TiCDC 服务进程列表
- 驱逐 owner 节点
- 动态调整 TiCDC Server 日志级别
所有 API 的请求体与返回值统一使用 JSON 格式数据。请求如果成功,则统一返回 200 OK。本文档以下部分描述当前提供的 API 的具体使用方法。
在下文的示例描述中,假设 TiCDC server 的监听 IP 地址为 127.0.0.1,端口为 8300。在启动 TiCDC server 时,可以通过 --addr=ip:port 指定 TiCDC 绑定的 IP 地址和端口。
API 统一错误格式
对 API 发起请求后,如发生错误,返回错误信息的格式如下所示:
{
"error_msg": "",
"error_code": ""
}
如上所示,error_msg 描述错误信息,error_code 则是对应的错误码。
API List 接口统一返回格式
一个 API 请求如果返回是一个资源列表(例如,返回所有的服务进程 Captures),TiCDC 统一的返回格式如下:
{
"total": 2,
"items": [
{
"id": "d2912e63-3349-447c-90ba-wwww",
"is_owner": true,
"address": "127.0.0.1:8300"
},
{
"id": "d2912e63-3349-447c-90ba-xxxx",
"is_owner": false,
"address": "127.0.0.1:8302"
}
]
}
如上所示:
total: 表示有一共有多少个资源。items: 当次请求返回的资源在这个数组里,数组所有的元素都是同种资源。
获取 TiCDC 节点状态信息
该接口是一个同步接口,请求成功会返回对应节点的状态信息。
请求 URI
GET /api/v2/status
使用样例
以下请求会获取 IP 地址为 127.0.0.1,端口号为 8300 的 TiCDC 节点的状态信息。
curl -X GET http://127.0.0.1:8300/api/v2/status
{
"version": "v7.2.0",
"git_hash": "10413bded1bdb2850aa6d7b94eb375102e9c44dc",
"id": "d2912e63-3349-447c-90ba-72a4e04b5e9e",
"pid": 1447,
"is_owner": true,
"liveness": 0
}
以上返回信息的字段解释如下:
version:当前 TiCDC 版本号。git_hash:Git 哈希值。id:该节点的 capture ID。pid:该节点 capture 进程的 PID。is_owner:表示该节点是否是 owner。liveness: 该节点是否在线。0表示正常,1表示处于graceful shutdown状态。
检查 TiCDC 集群的健康状态
该接口是一个同步接口,在集群健康的时候会返回 200 OK。
请求 URI
GET /api/v2/health
使用样例
curl -X GET http://127.0.0.1:8300/api/v2/health
如果集群健康,则返回 200 OK 和一个空的 json {}:
{}
如果集群不健康,则返回错误信息。
创建同步任务
该接口用于向 TiCDC 提交一个同步任务,请求成功会返回 200 OK。该返回结果表示服务器收到了执行命令指示,并不代表命令被成功执行。
请求 URI
POST /api/v2/changefeeds
参数说明
{
"changefeed_id": "string",
"replica_config": {
"bdr_mode": true,
"case_sensitive": true,
"check_gc_safe_point": true,
"consistent": {
"flush_interval": 0,
"level": "string",
"max_log_size": 0,
"storage": "string"
},
"enable_old_value": true,
"enable_sync_point": true,
"filter": {
"do_dbs": [
"string"
],
"do_tables": [
{
"database_name": "string",
"table_name": "string"
}
],
"event_filters": [
{
"ignore_delete_value_expr": "string",
"ignore_event": [
"string"
],
"ignore_insert_value_expr": "string",
"ignore_sql": [
"string"
],
"ignore_update_new_value_expr": "string",
"ignore_update_old_value_expr": "string",
"matcher": [
"string"
]
}
],
"ignore_dbs": [
"string"
],
"ignore_tables": [
{
"database_name": "string",
"table_name": "string"
}
],
"ignore_txn_start_ts": [
0
],
"rules": [
"string"
]
},
"force_replicate": true,
"ignore_ineligible_table": true,
"memory_quota": 0,
"mounter": {
"worker_num": 0
},
"sink": {
"column_selectors": [
{
"columns": [
"string"
],
"matcher": [
"string"
]
}
],
"csv": {
"delimiter": "string",
"include_commit_ts": true,
"null": "string",
"quote": "string"
},
"date_separator": "string",
"dispatchers": [
{
"matcher": [
"string"
],
"partition": "string",
"topic": "string"
}
],
"enable_partition_separator": true,
"encoder_concurrency": 0,
"protocol": "string",
"schema_registry": "string",
"terminator": "string",
"transaction_atomicity": "string"
},
"sync_point_interval": "string",
"sync_point_retention": "string"
},
"sink_uri": "string",
"start_ts": 0,
"target_ts": 0
}
参数说明如下:
changefeed_id、start_ts、target_ts、sink_uri 的含义和格式与使用 cli 创建同步任务中所作的解释相同,具体解释请参见该文档。需要注意,当在 sink_uri 中指定证书的路径时,须确保已将对应证书上传到对应的 TiCDC server 上。
replica_config 参数说明如下:
consistent 参数说明如下:
filter 参数说明如下:
filter.event_filters 参数说明如下,可参考日志过滤器。
mounter 参数说明如下:
sink 参数说明如下:
sink.column_selectors 是一个数组,元素参数说明如下:
sink.csv 参数说明如下:
sink.dispatchers:对于 MQ 类的 Sink,可以通过该参数配置 event 分发器,支持以下分发器:default、ts、rowid、table 。分发规则如下:
default:有多个唯一索引(包括主键)时按照 table 模式分发;只有一个唯一索引(或主键)时按照 rowid 模式分发;如果开启了 old value 特性,按照 table 模式分发。ts:以行变更的 commitTs 做 Hash 计算并进行 event 分发。rowid:以所选的 HandleKey 列名和列值做 Hash 计算并进行 event 分发。table:以表的 schema 名和 table 名做 Hash 计算并进行 event 分发。
sink.dispatchers 是一个数组,元素参数说明如下:
使用样例
以下请求会创建一个 ID 为 test5,sink_uri 为 blackhole:// 的同步任务。
curl -X POST -H "'Content-type':'application/json'" http://127.0.0.1:8300/api/v2/changefeeds -d '{"changefeed_id":"test5","sink_uri":"blackhole://"}'
如果请求成功,则返回 200 OK。如果请求失败,则返回错误信息和错误码。
响应体格式
{
"admin_job_type": 0,
"checkpoint_time": "string",
"checkpoint_ts": 0,
"config": {
"bdr_mode": true,
"case_sensitive": true,
"check_gc_safe_point": true,
"consistent": {
"flush_interval": 0,
"level": "string",
"max_log_size": 0,
"storage": "string"
},
"enable_old_value": true,
"enable_sync_point": true,
"filter": {
"do_dbs": [
"string"
],
"do_tables": [
{
"database_name": "string",
"table_name": "string"
}
],
"event_filters": [
{
"ignore_delete_value_expr": "string",
"ignore_event": [
"string"
],
"ignore_insert_value_expr": "string",
"ignore_sql": [
"string"
],
"ignore_update_new_value_expr": "string",
"ignore_update_old_value_expr": "string",
"matcher": [
"string"
]
}
],
"ignore_dbs": [
"string"
],
"ignore_tables": [
{
"database_name": "string",
"table_name": "string"
}
],
"ignore_txn_start_ts": [
0
],
"rules": [
"string"
]
},
"force_replicate": true,
"ignore_ineligible_table": true,
"memory_quota": 0,
"mounter": {
"worker_num": 0
},
"sink": {
"column_selectors": [
{
"columns": [
"string"
],
"matcher": [
"string"
]
}
],
"csv": {
"delimiter": "string",
"include_commit_ts": true,
"null": "string",
"quote": "string"
},
"date_separator": "string",
"dispatchers": [
{
"matcher": [
"string"
],
"partition": "string",
"topic": "string"
}
],
"enable_partition_separator": true,
"encoder_concurrency": 0,
"protocol": "string",
"schema_registry": "string",
"terminator": "string",
"transaction_atomicity": "string"
},
"sync_point_interval": "string",
"sync_point_retention": "string"
},
"create_time": "string",
"creator_version": "string",
"error": {
"addr": "string",
"code": "string",
"message": "string"
},
"id": "string",
"resolved_ts": 0,
"sink_uri": "string",
"start_ts": 0,
"state": "string",
"target_ts": 0,
"task_status": [
{
"capture_id": "string",
"table_ids": [
0
]
}
]
}
参数说明如下:
task_status 参数说明如下:
error 参数说明如下:
删除同步任务
该接口是幂等的(即其任意多次执行所产生的影响均与一次执行的影响相同),用于删除一个 changefeed 同步任务,请求成功会返回 200 OK。该返回结果表示服务器收到了执行命令指示,并不代表命令被成功执行。
请求 URI
DELETE /api/v2/changefeeds/{changefeed_id}
参数说明
路径参数
使用样例
以下请求会删除 ID 为 test1 的同步任务。
curl -X DELETE http://127.0.0.1:8300/api/v2/changefeeds/test1
如果请求成功,则返回 200 OK。如果请求失败,则返回错误信息和错误码。
更新同步任务配置
该接口用于更新一个同步任务,请求成功会返回 200 OK。该返回结果表示服务器收到了执行命令指示,并不代表命令被成功执行。
修改 changefeed 配置需要按照暂停任务 -> 修改配置 -> 恢复任务的流程。
请求 URI
PUT /api/v2/changefeeds/{changefeed_id}
参数说明
路径参数
请求体参数
{
"replica_config": {
"bdr_mode": true,
"case_sensitive": true,
"check_gc_safe_point": true,
"consistent": {
"flush_interval": 0,
"level": "string",
"max_log_size": 0,
"storage": "string"
},
"enable_old_value": true,
"enable_sync_point": true,
"filter": {
"do_dbs": [
"string"
],
"do_tables": [
{
"database_name": "string",
"table_name": "string"
}
],
"event_filters": [
{
"ignore_delete_value_expr": "string",
"ignore_event": [
"string"
],
"ignore_insert_value_expr": "string",
"ignore_sql": [
"string"
],
"ignore_update_new_value_expr": "string",
"ignore_update_old_value_expr": "string",
"matcher": [
"string"
]
}
],
"ignore_dbs": [
"string"
],
"ignore_tables": [
{
"database_name": "string",
"table_name": "string"
}
],
"ignore_txn_start_ts": [
0
],
"rules": [
"string"
]
},
"force_replicate": true,
"ignore_ineligible_table": true,
"memory_quota": 0,
"mounter": {
"worker_num": 0
},
"sink": {
"column_selectors": [
{
"columns": [
"string"
],
"matcher": [
"string"
]
}
],
"csv": {
"delimiter": "string",
"include_commit_ts": true,
"null": "string",
"quote": "string"
},
"date_separator": "string",
"dispatchers": [
{
"matcher": [
"string"
],
"partition": "string",
"topic": "string"
}
],
"enable_partition_separator": true,
"encoder_concurrency": 0,
"protocol": "string",
"schema_registry": "string",
"terminator": "string",
"transaction_atomicity": "string"
},
"sync_point_interval": "string",
"sync_point_retention": "string"
},
"sink_uri": "string",
"target_ts": 0
}
目前仅支持通过 API 修改同步任务的如下配置。
以上参数含义与创建同步任务中的参数相同,此处不再赘述。
使用样例
以下请求会更新 ID 为 test1 的同步任务的 target_ts 为 32。
curl -X PUT -H "'Content-type':'application/json'" http://127.0.0.1:8300/api/v2/changefeeds/test1 -d '{"target_ts":32}'
若是请求成功,则返回 200 OK,若请求失败,则返回错误信息和错误码。响应的 JSON 格式以及字段含义与创建同步任务中的响应参数相同,此处不再赘述。
查询同步任务列表
该接口是一个同步接口,请求成功会返回 TiCDC 集群中所有同步任务 (changefeed) 的基本信息。
请求 URI
GET /api/v2/changefeeds
参数说明
查询参数
state 可选值为 all、normal、stopped、error、failed、finished。
若不指定该参数,则默认返回处于 normal、stopped、failed 状态的同步任务基本信息。
使用样例
以下请求查询所有状态 (state) 为 normal 的同步任务的基本信息。
curl -X GET http://127.0.0.1:8300/api/v2/changefeeds?state=normal
{
"total": 2,
"items": [
{
"id": "test",
"state": "normal",
"checkpoint_tso": 439749918821711874,
"checkpoint_time": "2023-02-27 23:46:52.888",
"error": null
},
{
"id": "test2",
"state": "normal",
"checkpoint_tso": 439749918821711874,
"checkpoint_time": "2023-02-27 23:46:52.888",
"error": null
}
]
}
以上返回的信息的说明如下:
id:同步任务的 ID。state:同步任务当前所处的状态。checkpoint_tso:同步任务当前 checkpoint 的 TSO 表示。checkpoint_time:同步任务当前 checkpoint 的格式化时间表示。error:同步任务的错误信息。
查询特定同步任务
该接口是一个同步接口,请求成功会返回指定同步任务 (changefeed) 的详细信息。
请求 URI
GET /api/v2/changefeeds/{changefeed_id}
参数说明
路径参数
使用样例
以下请求会查询 ID 为 test1 的同步任务的详细信息。
curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test1
响应的 JSON 格式以及字段含义与创建同步任务中的响应参数相同,此处不再赘述。
暂停同步任务
该接口暂停一个同步任务,请求成功会返回 200 OK。该返回结果表示服务器收到了执行命令指示,并不代表命令被成功执行。
请求 URI
POST /api/v2/changefeeds/{changefeed_id}/pause
参数说明
路径参数
使用样例
以下请求会暂停 ID 为 test1 的同步任务。
curl -X POST http://127.0.0.1:8300/api/v2/changefeeds/test1/pause
如果请求成功,则返回 200 OK。如果请求失败,则返回错误信息和错误码。
恢复同步任务
该接口恢复一个同步任务,请求成功会返回 200 OK。该返回结果表示服务器收到了执行命令指示,并不代表命令被成功执行。
请求 URI
POST /api/v2/changefeeds/{changefeed_id}/resume
参数说明
路径参数
请求体参数
{
"overwrite_checkpoint_ts": 0
}
使用样例
以下请求会恢复 ID 为 test1 的同步任务。
curl -X POST http://127.0.0.1:8300/api/v2/changefeeds/test1/resume -d '{}'
如果请求成功,则返回 200 OK。如果请求失败,则返回错误信息和错误码。
查询同步子任务列表
该接口是一个同步接口,请求成功会返回当前 TiCDC 集群中的所有同步子任务 (processor) 的基本信息。
请求 URI
GET /api/v2/processors
使用样例
curl -X GET http://127.0.0.1:8300/api/v2/processors
{
"total": 3,
"items": [
{
"changefeed_id": "test2",
"capture_id": "d2912e63-3349-447c-90ba-72a4e04b5e9e"
},
{
"changefeed_id": "test1",
"capture_id": "d2912e63-3349-447c-90ba-72a4e04b5e9e"
},
{
"changefeed_id": "test",
"capture_id": "d2912e63-3349-447c-90ba-72a4e04b5e9e"
}
]
}
以上返回的信息的说明如下:
changefeed_id:同步任务的 ID。capture_id:Capture的 ID。
查询特定同步子任务
该接口是一个同步接口,请求成功会返回指定同步子任务 (processor) 的详细信息。
请求 URI
GET /api/v2/processors/{changefeed_id}/{capture_id}
参数说明
路径参数
使用样例
以下请求查询 changefeed_id 为 test、capture_id 为 561c3784-77f0-4863-ad52-65a3436db6af 的同步子任务。一个同步子任务通过 changefeed_id 和 capture_id 来标识。
curl -X GET http://127.0.0.1:8300/api/v2/processors/test/561c3784-77f0-4863-ad52-65a3436db6af
{
"table_ids": [
80
]
}
以上返回的信息的说明如下:
table_ids:在这个 capture 上同步的 table 的 ID。
查询 TiCDC 服务进程列表
该接口是一个同步接口,请求成功会返回当前 TiCDC 集群中的所有服务进程 (capture) 的基本信息。
请求 URI
GET /api/v2/captures
使用样例
curl -X GET http://127.0.0.1:8300/api/v2/captures
{
"total": 1,
"items": [
{
"id": "d2912e63-3349-447c-90ba-72a4e04b5e9e",
"is_owner": true,
"address": "127.0.0.1:8300"
}
]
}
以上返回的信息的说明如下:
id:capture的 ID。is_owner:该capture是否是 owner 。address:该capture的地址。
驱逐 owner 节点
该接口是一个异步的请求,请求成功会返回 200 OK。该返回结果表示服务器收到了执行命令指示,并不代表命令被成功执行。
请求 URI
POST /api/v2/owner/resign
使用样例
以下请求会驱逐 TiCDC 当前的 owner 节点,并会触发新一轮的选举,产生新的 owner 节点。
curl -X POST http://127.0.0.1:8300/api/v2/owner/resign
如果请求成功,则返回 200 OK。如果请求失败,则返回错误信息和错误码。
动态调整 TiCDC Server 日志级别
该接口是一个同步接口,请求成功会返回 200 OK。
请求 URI
POST /api/v2/log
请求参数
请求体参数
log_level 支持 zap 提供的日志级别:"debug"、"info"、"warn"、"error"、"dpanic"、"panic"、"fatal"。
使用样例
curl -X POST -H "'Content-type':'application/json'" http://127.0.0.1:8300/api/v2/log -d '{"log_level":"debug"}'
如果请求成功,则返回 200 OK。如果请求失败,则返回错误信息和错误码。