基于 Avro 的 TiCDC 行数据 Checksum 校验

本文介绍如何使用 Golang 消费 TiCDC 发送到 Kafka、且由 Avro 协议编码的数据,以及如何基于单行数据 Checksum 功能进行数据校验。

本示例代码位于 avro-checksum-verification 目录下。

本文使用 kafka-go 实现一个简单的 Kafka Consumer 程序。该程序不断地从指定的 Topic 中读取数据、计算并校验 Checksum 值。

package main import ( "context" "encoding/binary" "encoding/json" "hash/crc32" "io" "math" "net/http" "strconv" "strings" "github.com/linkedin/goavro/v2" "github.com/pingcap/log" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/types" "github.com/pingcap/tiflow/pkg/errors" "github.com/segmentio/kafka-go" "go.uber.org/zap" ) const ( // confluent avro wire format, the first byte is always 0 // https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format magicByte = uint8(0) ) func main() { var ( kafkaAddr = "127.0.0.1:9092" schemaRegistryURL = "http://127.0.0.1:8081" topic = "avro-checksum-test" consumerGroupID = "avro-checksum-test" ) consumer := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{kafkaAddr}, GroupID: consumerGroupID, Topic: topic, MaxBytes: 10e6, // 10MB }) defer consumer.Close() ctx := context.Background() log.Info("start consuming ...", zap.String("kafka", kafkaAddr), zap.String("topic", topic), zap.String("groupID", consumerGroupID)) for { // 1. 获取 kafka 消息 message, err := consumer.FetchMessage(ctx) if err != nil { log.Error("read kafka message failed", zap.Error(err)) } value := message.Value if len(value) == 0 { log.Info("delete event does not have value, skip checksum verification", zap.String("topic", topic)) } // 2. 对 value 进行解码,得到对应的 value map 和 schema map valueMap, valueSchema, err := getValueMapAndSchema(value, schemaRegistryURL) if err != nil { log.Panic("decode kafka value failed", zap.String("topic", topic), zap.ByteString("value", value), zap.Error(err)) } // 3. 使用上一步得到的 value map 和 schema map,计算并且校验 checksum err = CalculateAndVerifyChecksum(valueMap, valueSchema) if err != nil { log.Panic("calculate checksum failed", zap.String("topic", topic), zap.ByteString("value", value), zap.Error(err)) } // 4. 数据消费成功,提交 offset if err := consumer.CommitMessages(ctx, message); err != nil { log.Error("commit kafka message failed", zap.Error(err)) break } } }

从上面的代码可以看出,getValueMapAndSchema()CalculateAndVerifyChecksum() 是计算 Checksum 的关键步骤,下面分别介绍这两个函数的实现。

解码数据以及获取相应的 Schema

getValueMapAndSchema() 方法的主要作用是解码数据以及获取相应的 schema,二者均以 map[string]interface{} 类型返回。

// data is received kafka message's key or value, url is the schema registry url. // return the decoded value and corresponding schema as map. func getValueMapAndSchema(data []byte, url string) (map[string]interface{}, map[string]interface{}, error) { schemaID, binary, err := extractSchemaIDAndBinaryData(data) if err != nil { return nil, nil, err } codec, err := GetSchema(url, schemaID) if err != nil { return nil, nil, err } native, _, err := codec.NativeFromBinary(binary) if err != nil { return nil, nil, err } result, ok := native.(map[string]interface{}) if !ok { return nil, nil, errors.New("raw avro message is not a map") } schema := make(map[string]interface{}) if err := json.Unmarshal([]byte(codec.Schema()), &schema); err != nil { return nil, nil, errors.Trace(err) } return result, schema, nil } // extractSchemaIDAndBinaryData func extractSchemaIDAndBinaryData(data []byte) (int, []byte, error) { if len(data) < 5 { return 0, nil, errors.ErrAvroInvalidMessage.FastGenByArgs() } if data[0] != magicByte { return 0, nil, errors.ErrAvroInvalidMessage.FastGenByArgs() } return int(binary.BigEndian.Uint32(data[1:5])), data[5:], nil } // GetSchema query the schema registry to fetch the schema by the schema id. // return the goavro.Codec which can be used to encode and decode the data. func GetSchema(url string, schemaID int) (*goavro.Codec, error) { requestURI := url + "/schemas/ids/" + strconv.Itoa(schemaID) req, err := http.NewRequest("GET", requestURI, nil) if err != nil { log.Error("Cannot create the request to look up the schema", zap.Error(err)) return nil, errors.WrapError(errors.ErrAvroSchemaAPIError, err) } req.Header.Add( "Accept", "application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, "+ "application/json", ) httpClient := &http.Client{} resp, err := httpClient.Do(req) if err != nil { return nil, err } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { log.Error("Cannot parse the lookup schema response", zap.Error(err)) return nil, errors.WrapError(errors.ErrAvroSchemaAPIError, err) } if resp.StatusCode == 404 { log.Warn("Specified schema not found in Registry", zap.String("requestURI", requestURI), zap.Int("schemaID", schemaID)) return nil, errors.ErrAvroSchemaAPIError.GenWithStackByArgs("Schema not found in Registry") } if resp.StatusCode != 200 { log.Error("Failed to query schema from the Registry, HTTP error", zap.Int("status", resp.StatusCode), zap.String("uri", requestURI), zap.ByteString("responseBody", body)) return nil, errors.ErrAvroSchemaAPIError.GenWithStack("Failed to query schema from the Registry, HTTP error") } var jsonResp lookupResponse err = json.Unmarshal(body, &jsonResp) if err != nil { log.Error("Failed to parse result from Registry", zap.Error(err)) return nil, errors.WrapError(errors.ErrAvroSchemaAPIError, err) } codec, err := goavro.NewCodec(jsonResp.Schema) if err != nil { return nil, errors.WrapError(errors.ErrAvroSchemaAPIError, err) } return codec, nil } type lookupResponse struct { Name string `json:"name"` SchemaID int `json:"id"` Schema string `json:"schema"` }

计算并校验 Checksum

上一步获取的 valueMapvalueSchema 包含了所有用于 Checksum 计算和校验的元素。

在消费端计算和校验 Checksum 的过程包含以下几个步骤:

  1. 获取期望的 Checksum 值。
  2. 遍历每一列,根据列的数据值和对应的 MySQL Type,生成字节切片,不断更新 Checksum 值。
  3. 将上一步计算得到的 Checksum 和从收到的消息里提取出来的 Checksum 做比较。如果不一致,则说明 Checksum 校验失败,数据可能发生损坏。

示例代码如下:

func CalculateAndVerifyChecksum(valueMap, valueSchema map[string]interface{}) error { // fields 存放有数据变更事件的每一个列的类型信息,按照每一列的 ID 排序,该顺序和 Checksum 计算顺序相同 fields, ok := valueSchema["fields"].([]interface{}) if !ok { return errors.New("schema fields should be a map") } // 1. 从 valueMap 里面查找期望的 checksum 值,它被编码成 string 类型 // 如果找不到,说明 TiCDC 发送该条数据时,还没有开启 checksum 功能,直接返回即可 o, ok := valueMap["_tidb_row_level_checksum"] if !ok { return nil } expected := o.(string) if expected == "" { return nil } // expectedChecksum 即是从 TiCDC 传递而来的期望的 checksum 值 expectedChecksum, err := strconv.ParseUint(expected, 10, 64) if err != nil { return errors.Trace(err) } // 2. 遍历每一个 field,计算 checksum 值 var actualChecksum uint32 // buf 用来存储每次更新 checksum 时使用的字节切片 buf := make([]byte, 0) for _, item := range fields { field, ok := item.(map[string]interface{}) if !ok { return errors.New("schema field should be a map") } // `tidbOp` 及之后的列不参与到 checksum 计算中,因为它们是一些用于辅助数据消费的列,并非真实的 TiDB 列数据 colName := field["name"].(string) if colName == "_tidb_op" { break } // holder 存放有列类型信息 var holder map[string]interface{} switch ty := field["type"].(type) { case []interface{}: for _, item := range ty { if m, ok := item.(map[string]interface{}); ok { holder = m["connect.parameters"].(map[string]interface{}) break } } case map[string]interface{}: holder = ty["connect.parameters"].(map[string]interface{}) default: log.Panic("type info is anything else", zap.Any("typeInfo", field["type"])) } tidbType := holder["tidb_type"].(string) mysqlType := mysqlTypeFromTiDBType(tidbType) // 根据每一列的名字,从解码之后的 value map 里拿到该列的值 value, ok := valueMap[colName] if !ok { return errors.New("value not found") } value, err := getColumnValue(value, holder, mysqlType) if err != nil { return errors.Trace(err) } if len(buf) > 0 { buf = buf[:0] } // 根据每一列的 value 和 mysqlType,生成用于更新 checksum 的字节切片,然后更新 checksum buf, err = buildChecksumBytes(buf, value, mysqlType) if err != nil { return errors.Trace(err) } actualChecksum = crc32.Update(actualChecksum, crc32.IEEETable, buf) } if uint64(actualChecksum) != expectedChecksum { log.Error("checksum mismatch", zap.Uint64("expected", expectedChecksum), zap.Uint64("actual", uint64(actualChecksum))) return errors.New("checksum mismatch") } log.Info("checksum verified", zap.Uint64("checksum", uint64(actualChecksum))) return nil } func mysqlTypeFromTiDBType(tidbType string) byte { var result byte switch tidbType { case "INT", "INT UNSIGNED": result = mysql.TypeLong case "BIGINT", "BIGINT UNSIGNED": result = mysql.TypeLonglong case "FLOAT": result = mysql.TypeFloat case "DOUBLE": result = mysql.TypeDouble case "BIT": result = mysql.TypeBit case "DECIMAL": result = mysql.TypeNewDecimal case "TEXT": result = mysql.TypeVarchar case "BLOB": result = mysql.TypeLongBlob case "ENUM": result = mysql.TypeEnum case "SET": result = mysql.TypeSet case "JSON": result = mysql.TypeJSON case "DATE": result = mysql.TypeDate case "DATETIME": result = mysql.TypeDatetime case "TIMESTAMP": result = mysql.TypeTimestamp case "TIME": result = mysql.TypeDuration case "YEAR": result = mysql.TypeYear default: log.Panic("this should not happen, unknown TiDB type", zap.String("type", tidbType)) } return result } // value 是一个 interface 类型的值,需要根据 holder 提供的类型信息,做一次转换处理 func getColumnValue(value interface{}, holder map[string]interface{}, mysqlType byte) (interface{}, error) { switch t := value.(type) { // nullable 的列,其值被编码成一个 map,只有一个键值对,键是类型,值是真实的值,此处只关心真实的值 case map[string]interface{}: for _, v := range t { value = v } } switch mysqlType { case mysql.TypeEnum: // Enum 被编码成了 string,此处转换为对应于 Enum 定义的 int 值 allowed := strings.Split(holder["allowed"].(string), ",") switch t := value.(type) { case string: enum, err := types.ParseEnum(allowed, t, "") if err != nil { return nil, errors.Trace(err) } value = enum.Value case nil: value = nil } case mysql.TypeSet: // Set 被编码成了 string,根据 set 定义的顺序,转换为对应的 int 值 elems := strings.Split(holder["allowed"].(string), ",") switch t := value.(type) { case string: s, err := types.ParseSet(elems, t, "") if err != nil { return nil, errors.Trace(err) } value = s.Value case nil: value = nil } } return value, nil } // buildChecksumBytes 生成用于更新 checksum 的字节切片, 参考 https://github.com/pingcap/tidb/blob/e3417913f58cdd5a136259b902bf177eaf3aa637/util/rowcodec/common.go#L308 func buildChecksumBytes(buf []byte, value interface{}, mysqlType byte) ([]byte, error) { if value == nil { return buf, nil } switch mysqlType { // TypeTiny, TypeShort, TypeInt32 被编码成 int32 // TypeLong 被编码成 int32 if signed, else int64 // TypeLongLong,如果是 signed,被编码成 int64,否则被编码成 uint64, // 开启 checksum 功能,bigintUnsignedHandlingMode 必须设置为 string,被编码成 string. case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeLonglong, mysql.TypeInt24, mysql.TypeYear: switch a := value.(type) { case int32: buf = binary.LittleEndian.AppendUint64(buf, uint64(a)) case uint32: buf = binary.LittleEndian.AppendUint64(buf, uint64(a)) case int64: buf = binary.LittleEndian.AppendUint64(buf, uint64(a)) case uint64: buf = binary.LittleEndian.AppendUint64(buf, a) case string: v, err := strconv.ParseUint(a, 10, 64) if err != nil { return nil, errors.Trace(err) } buf = binary.LittleEndian.AppendUint64(buf, v) default: log.Panic("unknown golang type for the integral value", zap.Any("value", value), zap.Any("mysqlType", mysqlType)) } // Float 类型编码为 float32,Double 编码为 float64 case mysql.TypeFloat, mysql.TypeDouble: var v float64 switch a := value.(type) { case float32: v = float64(a) case float64: v = a } if math.IsInf(v, 0) || math.IsNaN(v) { v = 0 } buf = binary.LittleEndian.AppendUint64(buf, math.Float64bits(v)) // getColumnValue 将 Enum 和 Set 转换为了 uint64 类型 case mysql.TypeEnum, mysql.TypeSet: buf = binary.LittleEndian.AppendUint64(buf, value.(uint64)) case mysql.TypeBit: // bit 类型编码为 []bytes,需要进一步转换为 uint64 v, err := binaryLiteralToInt(value.([]byte)) if err != nil { return nil, errors.Trace(err) } buf = binary.LittleEndian.AppendUint64(buf, v) // 非二进制类型时,编码成 string, 反之则为 []byte case mysql.TypeVarchar, mysql.TypeVarString, mysql.TypeString, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob: switch a := value.(type) { case string: buf = appendLengthValue(buf, []byte(a)) case []byte: buf = appendLengthValue(buf, a) default: log.Panic("unknown golang type for the string value", zap.Any("value", value), zap.Any("mysqlType", mysqlType)) } case mysql.TypeTimestamp, mysql.TypeDatetime, mysql.TypeDate, mysql.TypeDuration, mysql.TypeNewDate: v := value.(string) buf = appendLengthValue(buf, []byte(v)) // 开启 checksum 功能时,decimalHandlingMode 必须设置为 string case mysql.TypeNewDecimal: buf = appendLengthValue(buf, []byte(value.(string))) case mysql.TypeJSON: buf = appendLengthValue(buf, []byte(value.(string))) // Null 和 Geometry 不参与到 checksum 计算 case mysql.TypeNull, mysql.TypeGeometry: // do nothing default: return buf, errors.New("invalid type for the checksum calculation") } return buf, nil } func appendLengthValue(buf []byte, val []byte) []byte { buf = binary.LittleEndian.AppendUint32(buf, uint32(len(val))) buf = append(buf, val...) return buf } // 将 []byte 转换为 uint64,参考 https://github.com/pingcap/tidb/blob/e3417913f58cdd5a136259b902bf177eaf3aa637/types/binary_literal.go#L105 func binaryLiteralToInt(bytes []byte) (uint64, error) { bytes = trimLeadingZeroBytes(bytes) length := len(bytes) if length > 8 { log.Error("invalid bit value found", zap.ByteString("value", bytes)) return math.MaxUint64, errors.New("invalid bit value") } if length == 0 { return 0, nil } val := uint64(bytes[0]) for i := 1; i < length; i++ { val = (val << 8) | uint64(bytes[i]) } return val, nil } func trimLeadingZeroBytes(bytes []byte) []byte { if len(bytes) == 0 { return bytes } pos, posMax := 0, len(bytes)-1 for ; pos < posMax; pos++ { if bytes[pos] != 0 { break } } return bytes[pos:] }