使用 Kafka TimestampConverter 微秒?

发布于 2025-01-10 18:32:31 字数 1211 浏览 0 评论 0原文

我是 Kafka/Avro 的新手。我的下游数据库 (Postgres) 有一个 timestamptz 列。我的上游数据库 (Materialize) 为该列生成以下 Avro 模式:

  "type": [
    "null",
    {
      "logicalType": "timestamp-micros",
      "type": "long"
    }
  ]

这似乎与 timestamptz 一致,它也存储微秒。但是,我得到以下信息:

org.postgresql.util.PSQLException: ERROR: column "time" is of type timestamp with time zone but expression is of type bigint 
  Hint: You will need to rewrite or cast the expression.
  Position: 207

        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:122)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
        ... 10 more

Kafka 消息似乎包含 bigint/long 并且 JdbcSinkConnector 不知道如何转换它到一个时间戳。我尝试使用 Kafka 的 TimestampConverter,但它假设输入时间戳以毫秒为单位。

看起来 TimestampConverter 不直接支持微秒 (https: //issues.apache.org/jira/browse/KAFKA-10561

有没有办法在 Kafka 连接器中将微秒转换为毫秒?这可能只是一个 hack,所以我不需要更改 Avro 架构。如果有一个除以 1000 或删除最后 3 位数字的转换,那么应该可以工作。有没有办法在没有自定义转换或 ksqlDB 的情况下做到这一点?

I'm new to Kafka/Avro. My downstream database (Postgres) has a timestamptz column. My upstream database (Materialize) produces the following Avro schema for the column:

  "type": [
    "null",
    {
      "logicalType": "timestamp-micros",
      "type": "long"
    }
  ]

This seems consistent with timestamptz, which also stores microseconds. However, I'm getting the following:

org.postgresql.util.PSQLException: ERROR: column "time" is of type timestamp with time zone but expression is of type bigint 
  Hint: You will need to rewrite or cast the expression.
  Position: 207

        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:122)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
        ... 10 more

It seems like the Kafka messages contain bigint/long and JdbcSinkConnector doesn't know how to convert it to a timestamp. I tried using Kafka's TimestampConverter, but it assumes the input timestamp is in milliseconds.

It doesn't look like TimestampConverter supports microseconds directly (https://issues.apache.org/jira/browse/KAFKA-10561)

Is there a way to convert microseconds to milliseconds in Kafka connectors? This can just be a hack, so I don't need to change the Avro schema. If there's a transform to divide by 1000 or remove the last 3 digits, that should work. Is there a way to do this without a custom transform or ksqlDB?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文