使用 Kafka TimestampConverter 微秒?
我是 Kafka/Avro 的新手。我的下游数据库 (Postgres) 有一个 timestamptz
列。我的上游数据库 (Materialize) 为该列生成以下 Avro 模式:
"type": [
"null",
{
"logicalType": "timestamp-micros",
"type": "long"
}
]
这似乎与 timestamptz
一致,它也存储微秒。但是,我得到以下信息:
org.postgresql.util.PSQLException: ERROR: column "time" is of type timestamp with time zone but expression is of type bigint
Hint: You will need to rewrite or cast the expression.
Position: 207
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:122)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
... 10 more
Kafka 消息似乎包含 bigint
/long
并且 JdbcSinkConnector
不知道如何转换它到一个时间戳。我尝试使用 Kafka 的 TimestampConverter,但它假设输入时间戳以毫秒为单位。
看起来 TimestampConverter
不直接支持微秒 (https: //issues.apache.org/jira/browse/KAFKA-10561)
有没有办法在 Kafka 连接器中将微秒转换为毫秒?这可能只是一个 hack,所以我不需要更改 Avro 架构。如果有一个除以 1000 或删除最后 3 位数字的转换,那么应该可以工作。有没有办法在没有自定义转换或 ksqlDB 的情况下做到这一点?
I'm new to Kafka/Avro. My downstream database (Postgres) has a timestamptz
column. My upstream database (Materialize) produces the following Avro schema for the column:
"type": [
"null",
{
"logicalType": "timestamp-micros",
"type": "long"
}
]
This seems consistent with timestamptz
, which also stores microseconds. However, I'm getting the following:
org.postgresql.util.PSQLException: ERROR: column "time" is of type timestamp with time zone but expression is of type bigint
Hint: You will need to rewrite or cast the expression.
Position: 207
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:122)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
... 10 more
It seems like the Kafka messages contain bigint
/long
and JdbcSinkConnector
doesn't know how to convert it to a timestamp. I tried using Kafka's TimestampConverter
, but it assumes the input timestamp is in milliseconds.
It doesn't look like TimestampConverter
supports microseconds directly (https://issues.apache.org/jira/browse/KAFKA-10561)
Is there a way to convert microseconds to milliseconds in Kafka connectors? This can just be a hack, so I don't need to change the Avro schema. If there's a transform to divide by 1000 or remove the last 3 digits, that should work. Is there a way to do this without a custom transform or ksqlDB?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论