- 关于 TiDB
- 快速上手
- 部署集群
- 数据迁移
- 数据迁移概述
- 从 MySQL 迁移至 TiDB
- 从 CSV 文件迁移至 TiDB
- 运维操作
- 监控与告警
- 故障诊断
- 性能调优
- 系统调优
- 软件调优
- SQL 性能调优
- 教程
- TiDB 生态工具
- TiDB 生态工具功能概览
- TiDB 生态工具适用场景
- TiDB 工具下载
- Backup & Restore (BR)
- TiDB Binlog
- TiDB Lightning
- TiCDC 简介
- Dumpling 使用文档
- sync-diff-inspector
- Loader 使用文档
- Mydumper 使用文档
- Syncer 使用文档
- TiSpark
- 参考指南
- 架构
- 监控指标
- 安全加固
- 权限
- SQL
- SQL 语言结构和语法
- 属性
- 字面值
- Schema 对象名
- 关键字
- 用户自定义变量
- 表达式语法
- 注释语法
- SQL 语句
- ADD COLUMN
- ADD INDEX
- ADMIN
- ALTER DATABASE
- ALTER INSTANCE
- ALTER TABLE
- ALTER USER
- ANALYZE
- BACKUP
- BEGIN
- CHANGE COLUMN
- CHANGE DRAINER
- CHANGE PUMP
- COMMIT
- CREATE [GLOBAL|SESSION] BINDING
- CREATE DATABASE
- CREATE INDEX
- CREATE ROLE
- CREATE SEQUENCE
- CREATE TABLE LIKE
- CREATE TABLE
- CREATE USER
- CREATE VIEW
- DEALLOCATE
- DELETE
- DESC
- DESCRIBE
- DO
- DROP [GLOBAL|SESSION] BINDING
- DROP COLUMN
- DROP DATABASE
- DROP INDEX
- DROP ROLE
- DROP SEQUENCE
- DROP STATS
- DROP TABLE
- DROP USER
- DROP VIEW
- EXECUTE
- EXPLAIN ANALYZE
- EXPLAIN
- FLASHBACK TABLE
- FLUSH PRIVILEGES
- FLUSH STATUS
- FLUSH TABLES
- GRANT
- GRANT
- INSERT
- KILL [TIDB]
- LOAD STATS
- MODIFY COLUMN
- PREPARE
- RECOVER TABLE
- RENAME INDEX
- RENAME TABLE
- REPLACE
- RESTORE
- REVOKE
- REVOKE
- ROLLBACK
- SELECT
- SET DEFAULT ROLE
- SET [NAMES|CHARACTER SET]
- SET PASSWORD
- SET ROLE
- SET TRANSACTION
- SET [GLOBAL|SESSION]
- SHOW [BACKUPS|RESTORES]
- SHOW ANALYZE STATUS
- SHOW [GLOBAL|SESSION] BINDINGS
- SHOW BUILTINS
- SHOW CHARACTER SET
- SHOW COLLATION
- SHOW [FULL] COLUMNS FROM
- SHOW CONFIG
- SHOW CREATE SEQUENCE
- SHOW CREATE TABLE
- SHOW CREATE USER
- SHOW DATABASES
- SHOW DRAINER STATUS
- SHOW ENGINES
- SHOW ERRORS
- SHOW [FULL] FIELDS FROM
- SHOW GRANTS
- SHOW INDEX [FROM|IN]
- SHOW INDEXES [FROM|IN]
- SHOW KEYS [FROM|IN]
- SHOW MASTER STATUS
- SHOW PLUGINS
- SHOW PRIVILEGES
- SHOW [FULL] PROCESSLIST
- SHOW PROFILES
- SHOW PUMP STATUS
- SHOW SCHEMAS
- SHOW STATS_HEALTHY
- SHOW STATS_HISTOGRAMS
- SHOW STATS_META
- SHOW [GLOBAL|SESSION] STATUS
- SHOW TABLE NEXTROWID
- SHOW TABLE REGIONS
- SHOW TABLE STATUS
- SHOW [FULL] TABLES
- SHOW [GLOBAL|SESSION] VARIABLES
- SHOW WARNINGS
- SHUTDOWN
- Split Region 使用文档
- START TRANSACTION
- TRACE
- TRUNCATE
- UPDATE
- USE
- 数据类型
- 函数与操作符
- 约束
- 生成列
- SQL 模式
- 事务
- 垃圾回收 (GC)
- 视图
- 分区表
- 字符集和排序规则
- 系统表
- TiDB 系统表
- INFORMATION_SCHEMA
- TiDB 简介
- ANALYZE_STATUS
- CHARACTER_SETS
- CLUSTER_CONFIG
- CLUSTER_HARDWARE
- CLUSTER_INFO
- CLUSTER_LOAD
- CLUSTER_LOG
- CLUSTER_SYSTEMINFO
- COLLATIONS
- COLLATIONCHARACTERSET_APPLICABILITY
- COLUMNS
- DDL_JOBS
- ENGINES
- INSPECTION_RESULT
- INSPECTION_RULES
- INSPECTION_SUMMARY
- KEYCOLUMNUSAGE
- METRICS_SUMMARY
- METRICS_TABLES
- PARTITIONS
- PROCESSLIST
- SCHEMATA
- SEQUENCES
- SESSION_VARIABLES
- SLOW_QUERY
- STATISTICS
- TABLES
- TABLE_CONSTRAINTS
- TABLESTORAGESTATS
- TIDBHOTREGIONS
- TIDB_INDEXES
- TIDBSERVERSINFO
- TIFLASH_REPLICA
- TIKVREGIONPEERS
- TIKVREGIONSTATUS
- TIKVSTORESTATUS
- USER_PRIVILEGES
- VIEWS
- Metrics Schema
- SQL 语言结构和语法
- UI
- CLI
- 命令行参数
- 配置文件参数
- 系统变量
- 存储引擎
- TiUP
- 遥测
- 错误码与故障诊断
- TiCDC Open Protocol
- 通过拓扑 label 进行副本调度
- 常见问题解答 (FAQ)
- 术语表
文章来源于网络收集而来,版权归原创者所有,如有侵权请及时联系!
Binlog Consumer Client 用户文档
目前 Drainer 提供了多种输出方式,包括 MySQL、TiDB、file 等。但是用户往往有一些自定义的需求,比如输出到 Elasticsearch、Hive 等,这些需求 Drainer 现在还没有实现,因此 Drainer 增加了输出到 Kafka 的功能,将 binlog 数据解析后按一定的格式再输出到 Kafka 中,用户编写代码从 Kafka 中读出数据再进行处理。
配置 Kafka Drainer
修改 Drainer 的配置文件,设置输出为 Kafka,相关配置如下:
[syncer]
db-type = "kafka"
[syncer.to]
# Kafka 地址
kafka-addrs = "127.0.0.1:9092"
# Kafka 版本号
kafka-version = "0.8.2.0"
自定义开发
数据格式
首先需要了解 Drainer 写入到 Kafka 中的数据格式:
// Column 保存列的数据,针对数据的类型,保存在对应的变量中
message Column {
// 数据是否为 null
optional bool is_null = 1 [ default = false ];
// 保存 int 类型的数据
optional int64 int64_value = 2;
// 保存 uint、enum, set 类型的数据
optional uint64 uint64_value = 3;
// 保存 float、double 类型的数据
optional double double_value = 4;
// 保存 bit、blob、binary、json 类型的数据
optional bytes bytes_value = 5;
// 保存 date、time、decimal、text、char 类型的数据
optional string string_value = 6;
}
// ColumnInfo 保存列的信息,包括列名、类型、是否为主键
message ColumnInfo {
optional string name = 1 [ (gogoproto.nullable) = false ];
// MySQL 中小写的列字段类型
// https://dev.mysql.com/doc/refman/8.0/en/data-types.html
// numeric 类型:int bigint smallint tinyint float double decimal bit
// string 类型:text longtext mediumtext char tinytext varchar
// blob longblob mediumblob binary tinyblob varbinary
// enum set
// json 类型:json
optional string mysql_type = 2 [ (gogoproto.nullable) = false ];
optional bool is_primary_key = 3 [ (gogoproto.nullable) = false ];
}
// Row 保存一行的具体数据
message Row { repeated Column columns = 1; }
// MutationType 表示 DML 的类型
enum MutationType {
Insert = 0;
Update = 1;
Delete = 2;
}
// Table 包含一个表的数据变更
message Table {
optional string schema_name = 1;
optional string table_name = 2;
repeated ColumnInfo column_info = 3;
repeated TableMutation mutations = 4;
}
// TableMutation 保存一行数据的变更
message TableMutation {
required MutationType type = 1;
// 修改后的数据
required Row row = 2;
// 修改前的数据,只对 Update MutationType 有效
optional Row change_row = 3;
}
// DMLData 保存一个事务所有的 DML 造成的数据变更
message DMLData {
// `tables` 包含事务中所有表的数据变更
repeated Table tables = 1;
}
// DDLData 保存 DDL 的信息
message DDLData {
// 当前使用的数据库
optional string schema_name = 1;
// 相关表
optional string table_name = 2;
// `ddl_query` 是原始的 DDL 语句 query
optional bytes ddl_query = 3;
}
// BinlogType 为 Binlog 的类型,分为 DML 和 DDL
enum BinlogType {
DML = 0; // Has `dml_data`
DDL = 1; // Has `ddl_query`
}
// Binlog 保存一个事务所有的变更,Kafka 中保存的数据为该结构数据序列化后的结果
message Binlog {
optional BinlogType type = 1 [ (gogoproto.nullable) = false ];
optional int64 commit_ts = 2 [ (gogoproto.nullable) = false ];
optional DMLData dml_data = 3;
optional DDLData ddl_data = 4;
}
查看数据格式的具体定义,参见 binlog.proto。
Driver
TiDB-Tools 项目提供了用于读取 Kafka 中 binlog 数据的 Driver,具有如下功能:
- 读取 Kafka 的数据
- 根据 commit ts 查找 binlog 在 kafka 中的储存位置
使用该 Driver 时,用户需要配置如下信息:
- KafkaAddr:Kafka 集群的地址
- CommitTS:从哪个 commit ts 开始读取 binlog
- Offset:从 Kafka 哪个 offset 开始读取,如果设置了 CommitTS 就不用配置该参数
- ClusterID:TiDB 集群的 cluster ID
- Topic: Kafka Topic 名称,如果 Topic 名称为空,将会使用 drainer
<ClusterID>_obinlog
中的默认名称
用户以包的形式引用 Driver 的代码即可使用,可以参考 Driver 中提供的示例代码来学习如何使用 Driver 以及 binlog 数据的解析,目前提供了两个例子:
- 使用该 Driver 将数据同步到 MySQL,该示例包含将 binlog 转化为 SQL 的具体方法
- 使用该 Driver 将数据打印出来
Driver 项目地址:TiDB Binlog Driver。
注意:
- 示例代码仅仅用于示范如何使用 Driver,如果需要用于生产环境需要优化代码。
- 目前仅提供了 golang 版本的 Driver 以及示例代码。如果需要使用其他语言,用户需要根据 binlog 的 proto 文件生成相应语言的代码文件,并自行开发程序读取 Kafka 中的 binlog 数据、解析数据、输出到下游。也欢迎用户优化 example 代码,以及提交其他语言的示例代码到 TiDB-Tools。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论