Kafka jdbc接收器连接器创建与原始数据类型不匹配的数据类型
我正在使用 Kafka 和 Kafka Connect 将 MS SQL Server 数据库复制到 MySQL,使用 debezium sql server CDC 源连接器和汇合 JDBC 接收器连接器。 “auto.create”设置为 true,接收器连接器确实创建了表,但某些数据类型不匹配。在 SQL Sever 中,我有,
CREATE TABLE employees (
id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE,
start_date DATE,
salary INT,
secret FLOAT,
create_time TIME
);
但在 MySQL 中,它创建了以下内容:
mysql> desc employees;
+-------------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------+-------------+------+-----+---------+-------+
| id | int | NO | PRI | NULL | |
| first_name | text | NO | | NULL | |
| last_name | text | NO | | NULL | |
| email | text | NO | | NULL | |
| start_date | int | YES | | NULL | |
| salary | int | YES | | NULL | |
| secret | double | YES | | NULL | |
| create_time | bigint | YES | | NULL | |
| messageTS | datetime(3) | YES | | NULL | |
+-------------+-------------+------+-----+---------+-------+
忽略messgeTS,这是我在 SMT 中添加的额外字段。
名字、姓氏、电子邮件、开始日期和创建时间的数据类型均不匹配。它 将 VARCHAR(255) 转换为文本,将 DATE 转换为 int,将 TIME 转换为 bigint。
只是想知道是否有什么配置错误?
我正在使用 docker 运行 SQL Server 2019 和 MySQL 9.0.28。
我还尝试了禁用自动创建和自动演化并使用正确的数据类型预先创建表的建议。
mysql> desc employees;
+-------------+--------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------+--------------+------+-----+---------+----------------+
| id | int | NO | PRI | NULL | auto_increment |
| first_name | varchar(255) | NO | | NULL | |
| last_name | varchar(255) | NO | | NULL | |
| email | varchar(255) | NO | | NULL | |
| start_date | date | NO | | NULL | |
| salary | int | NO | | NULL | |
| secret | double | NO | | NULL | |
| create_time | datetime | NO | | NULL | |
| messageTS | datetime | NO | | NULL | |
+-------------+--------------+------+-----+---------+----------------+
但在尝试插入数据库时,它会出现以下异常:
kafka-connect | [2022-03-04 19:55:07,331] INFO Setting metadata for table "employees" to Table{name='"employees"', type=TABLE columns=[Column{'first_name', isPrimaryKey=false, allowsNull=false, sqlType=VARCHAR}, Column{'secret', isPrimaryKey=false, allowsNull=false, sqlType=DOUBLE}, Column{'salary', isPrimaryKey=false, allowsNull=false, sqlType=INT}, Column{'start_date', isPrimaryKey=false, allowsNull=false, sqlType=DATE}, Column{'email', isPrimaryKey=false, allowsNull=false, sqlType=VARCHAR}, Column{'id', isPrimaryKey=true, allowsNull=false, sqlType=INT}, Column{'last_name', isPrimaryKey=false, allowsNull=false, sqlType=VARCHAR}, Column{'messageTS', isPrimaryKey=false, allowsNull=false, sqlType=DATETIME}, Column{'create_time', isPrimaryKey=false, allowsNull=false, sqlType=DATETIME}]} (io.confluent.connect.jdbc.util.TableDefinitions)
kafka-connect | [2022-03-04 19:55:07,382] WARN Write of 4 records failed, remainingRetries=0 (io.confluent.connect.jdbc.sink.JdbcSinkTask)
kafka-connect | java.sql.BatchUpdateException: Data truncation: Incorrect date value: '19055' for column 'start_date' at row 1
消息的值是
{"id":1002,"first_name":"George","last_name":"Bailey","email":"[email protected]","start_date":{"int":19055},"salary":{"int":100000},"secret":{"double":0.867153569942739},"create_time":{"long":1646421476477}}
start_date 字段的消息架构是
{
"name": "start_date",
"type": [
"null",
{
"type": "int",
"connect.version": 1,
"connect.name": "io.debezium.time.Date"
}
],
"default": null
}
它看起来不知道如何将 io.debezium.time.Date 转换为Date 并将其视为 int。
非常感谢对此的任何指示。
源配置:
{
"name": "SimpleSQLServerCDC",
"config":{
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max":1,
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"confluent.topic.bootstrap.servers":"kafka:29092",
"database.hostname" : "sqlserver",
"database.port" : "1433",
"database.user" : "sa",
"database.password" : "",
"database.dbname" : "testDB",
"database.server.name" : "corporation",
"database.history.kafka.topic": "dbhistory.corporation",
"database.history.kafka.bootstrap.servers" : "kafka:29092",
"topic.creation.default.replication.factor": 1,
"topic.creation.default.partitions": 10,
"topic.creation.default.cleanup.policy": "delete"
}
}
接收器配置:
{
"name": "SimpleMySQLJDBC",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://mysql:3306/sinkdb",
"connection.user": "user",
"connection.password": "",
"tasks.max": "2",
"topics.regex": "corporation.dbo.*",
"auto.create": "true",
"auto.evolve": "true",
"dialect.name": "MySqlDatabaseDialect",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields":"id",
"delete.enabled": "true",
"batch.size": 1,
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms":"unwrap,dropPrefix,insertTS",
"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex":"corporation.dbo.(.*)",
"transforms.dropPrefix.replacement":"$1",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones":"false",
"transforms.unwrap.delete.handling.mode":"drop",
"transforms.insertTS.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTS.timestamp.field": "messageTS",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.tolerance":"all",
"errors.deadletterqueue.topic.name":"dlq-mysql",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.deadletterqueue.topic.replication.factor":"1"
}
}
I am using Kafka and Kafka Connect to replicate MS SQL Server database to MySQL using debezium sql server CDC source connector and confluent JDBC sink connector. The "auto.create" is set to true and the sink connector did create the tables, but some of the data types do not match. In SQL Sever, I have
CREATE TABLE employees (
id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE,
start_date DATE,
salary INT,
secret FLOAT,
create_time TIME
);
but in MySQL, it created the following:
mysql> desc employees;
+-------------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------+-------------+------+-----+---------+-------+
| id | int | NO | PRI | NULL | |
| first_name | text | NO | | NULL | |
| last_name | text | NO | | NULL | |
| email | text | NO | | NULL | |
| start_date | int | YES | | NULL | |
| salary | int | YES | | NULL | |
| secret | double | YES | | NULL | |
| create_time | bigint | YES | | NULL | |
| messageTS | datetime(3) | YES | | NULL | |
+-------------+-------------+------+-----+---------+-------+
ignore messgeTS, that's an extra field I added in the SMT.
The data types for first_name, last_name, email, start_date and create time all do not match. It
converts VARCHAR(255) to text, DATE to int, and TIME to bigint.
Just wondering if anything is misconfigured?
I'm running SQL Server 2019 and MySQL 9.0.28 using docker.
I've also tried the suggestion of disabling autocreate and autoevolve and pre-create the tables with the proper data types.
mysql> desc employees;
+-------------+--------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------+--------------+------+-----+---------+----------------+
| id | int | NO | PRI | NULL | auto_increment |
| first_name | varchar(255) | NO | | NULL | |
| last_name | varchar(255) | NO | | NULL | |
| email | varchar(255) | NO | | NULL | |
| start_date | date | NO | | NULL | |
| salary | int | NO | | NULL | |
| secret | double | NO | | NULL | |
| create_time | datetime | NO | | NULL | |
| messageTS | datetime | NO | | NULL | |
+-------------+--------------+------+-----+---------+----------------+
But it gives the following exceptions when trying to insert into the database:
kafka-connect | [2022-03-04 19:55:07,331] INFO Setting metadata for table "employees" to Table{name='"employees"', type=TABLE columns=[Column{'first_name', isPrimaryKey=false, allowsNull=false, sqlType=VARCHAR}, Column{'secret', isPrimaryKey=false, allowsNull=false, sqlType=DOUBLE}, Column{'salary', isPrimaryKey=false, allowsNull=false, sqlType=INT}, Column{'start_date', isPrimaryKey=false, allowsNull=false, sqlType=DATE}, Column{'email', isPrimaryKey=false, allowsNull=false, sqlType=VARCHAR}, Column{'id', isPrimaryKey=true, allowsNull=false, sqlType=INT}, Column{'last_name', isPrimaryKey=false, allowsNull=false, sqlType=VARCHAR}, Column{'messageTS', isPrimaryKey=false, allowsNull=false, sqlType=DATETIME}, Column{'create_time', isPrimaryKey=false, allowsNull=false, sqlType=DATETIME}]} (io.confluent.connect.jdbc.util.TableDefinitions)
kafka-connect | [2022-03-04 19:55:07,382] WARN Write of 4 records failed, remainingRetries=0 (io.confluent.connect.jdbc.sink.JdbcSinkTask)
kafka-connect | java.sql.BatchUpdateException: Data truncation: Incorrect date value: '19055' for column 'start_date' at row 1
The value of the message is
{"id":1002,"first_name":"George","last_name":"Bailey","email":"[email protected]","start_date":{"int":19055},"salary":{"int":100000},"secret":{"double":0.867153569942739},"create_time":{"long":1646421476477}}
The schema of the message for the start_date field is
{
"name": "start_date",
"type": [
"null",
{
"type": "int",
"connect.version": 1,
"connect.name": "io.debezium.time.Date"
}
],
"default": null
}
It looks like it does not know how to convert an io.debezium.time.Date to a Date and treated it as an int instead.
Any pointers on this are greatly appreciated.
Source Config:
{
"name": "SimpleSQLServerCDC",
"config":{
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max":1,
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"confluent.topic.bootstrap.servers":"kafka:29092",
"database.hostname" : "sqlserver",
"database.port" : "1433",
"database.user" : "sa",
"database.password" : "",
"database.dbname" : "testDB",
"database.server.name" : "corporation",
"database.history.kafka.topic": "dbhistory.corporation",
"database.history.kafka.bootstrap.servers" : "kafka:29092",
"topic.creation.default.replication.factor": 1,
"topic.creation.default.partitions": 10,
"topic.creation.default.cleanup.policy": "delete"
}
}
Sink Config:
{
"name": "SimpleMySQLJDBC",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://mysql:3306/sinkdb",
"connection.user": "user",
"connection.password": "",
"tasks.max": "2",
"topics.regex": "corporation.dbo.*",
"auto.create": "true",
"auto.evolve": "true",
"dialect.name": "MySqlDatabaseDialect",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields":"id",
"delete.enabled": "true",
"batch.size": 1,
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms":"unwrap,dropPrefix,insertTS",
"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex":"corporation.dbo.(.*)",
"transforms.dropPrefix.replacement":"$1",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones":"false",
"transforms.unwrap.delete.handling.mode":"drop",
"transforms.insertTS.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTS.timestamp.field": "messageTS",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.tolerance":"all",
"errors.deadletterqueue.topic.name":"dlq-mysql",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.deadletterqueue.topic.replication.factor":"1"
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
Connect API 数据类型不支持字段的字符限制。任何类似字符串的数据都将成为
TEXT
列类型。我认为,默认情况下,日期时间值会转换为 Unix 纪元。您可以使用
TimestampConverter
转换转换为不同的格式总体而言,如果您想准确保留类型,请禁用从接收器连接器自动创建表并使用您想要的类型预先创建表。
The character limit of the fields is not carried through the Connect API datatypes. Any String-like data will become
TEXT
column types.I think, by default, datetime values are converted into Unix epoch. You can use the
TimestampConverter
transform to convert to a different formatOverall, if you want to accurately preserve types, disable the auto-creation of tables from the sink connector and pre-create tables with the types you want.
我刚刚制作了一个 SMT,将所有时间戳字段转换为字符串。希望它能有所帮助。
https://github.com/FX-HAO/kafka-connect-debezium-变换
I just made an SMT that converts all timestamp fields to strings. hopefully, it could help.
https://github.com/FX-HAO/kafka-connect-debezium-tranforms
您需要进行 2 项更改
在源连接器中添加
"time. precision.mode":"connect"
在水槽连接器中添加
You need to make 2 changes
In Source Connector add
"time.precision.mode":"connect"
In sink connector add