Kafka 汇至 InfluxDB

发布于 2025-01-16 14:09:56 字数 3442 浏览 3 评论 0原文

我正在尝试使用 Confluence/Kafka 堆栈将数据从我的 kafka 主题获取到 InfluxDB 中。目前,主题中的消息的形式为{“tag1”:“123”,“tag2”:“456”}(我对消息格式有比较好的控制,我选择了JSON如上,如果需要的话可以包括时间戳等)。

理想情况下,我想添加许多标签,而无需将来指定架构/列名称。

我遵循 https://docs。 confluence.io/kafka-connect-influxdb/current/influx-db-sink-connector/index.html (“无模式 JSON 标签示例”),因为这与我的用例非常匹配。目前,每条消息的“键”只是 MQTT 主题名称(该主题的源是 MQTT 连接器)。所以我将“key.converter”设置为“stringconverter”(而不是示例中的 JSONconverter)。

我在网上看到的其他示例似乎表明需要设置模式,我想避免这种情况。使用 InfluxDB v1.8,一切都在 Docker 上/在 Portainer 上维护。

我似乎无法启动连接器并且永远无法移动任何数据。

下面是我的 InfluxDBSink 连接器的配置:

{
  "name": "InfluxDBSinkKafka",
  "config": {
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "name": "InfluxDBSinkKafka",
    "connector.class": "io.confluent.influxdb.InfluxDBSinkConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "topics": "KAFKATOPIC1",
    "influxdb.url": "http://URL:PORT",
    "influxdb.db": "tagdata",
    "measurement.name.format": "${topic}"
  }
}

连接器失败,每次我单击“开始”(播放按钮)时,连接容器的日志中都会弹出以下内容:

[2022-03-22 15:46:52,562] INFO [Worker clientId=connect-1, groupId=compose-connect-group]
Connector InfluxDBSinkKafka target state change (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2022-03-22 15:46:52,562] INFO Setting connector InfluxDBSinkKafka state to STARTED (org.apache.kafka.connect.runtime.Worker)
[2022-03-22 15:46:52,562] INFO SinkConnectorConfig values: 
    config.action.reload = restart
    connector.class = io.confluent.influxdb.InfluxDBSinkConnector
    errors.deadletterqueue.context.headers.enable = false
    errors.deadletterqueue.topic.name = 
    errors.deadletterqueue.topic.replication.factor = 3
    errors.log.enable = false
    errors.log.include.messages = false
    errors.retry.delay.max.ms = 60000
    errors.retry.timeout = 0
    errors.tolerance = none
    header.converter = null
    key.converter = class org.apache.kafka.connect.storage.StringConverter
    name = InfluxDBSinkKafka
    predicates = []
    tasks.max = 1
    topics = [KAFKATOPIC1]
    topics.regex = 
    transforms = []
    value.converter = class org.apache.kafka.connect.json.JsonConverter
 (org.apache.kafka.connect.runtime.SinkConnectorConfig)
[2022-03-22 15:46:52,563] INFO EnrichedConnectorConfig values: 
    config.action.reload = restart
    connector.class = io.confluent.influxdb.InfluxDBSinkConnector
    errors.deadletterqueue.context.headers.enable = false
    errors.deadletterqueue.topic.name = 
    errors.deadletterqueue.topic.replication.factor = 3
    errors.log.enable = false
    errors.log.include.messages = false
    errors.retry.delay.max.ms = 60000
    errors.retry.timeout = 0
    errors.tolerance = none
    header.converter = null
    key.converter = class org.apache.kafka.connect.storage.StringConverter
    name = InfluxDBSinkKafka
    predicates = []
    tasks.max = 1
    topics = [KAFKATOPIC1]
    topics.regex = 
    transforms = []
    value.converter = class org.apache.kafka.connect.json.JsonConverter
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig)

我感觉有点超出我的深度,希望能提供任何帮助所有的帮助。

I'm trying to get data from my kafka topic into InfluxDB using the Confluent/Kafka stack. At the moment, the messages in the topic have a form of {"tag1":"123","tag2":"456"} (I have relatively good control over the message format, I chose the JSON to be as above, could include a timestamp etc if necessary).

Ideally, I would like to add many tags without needing to specify a schema/column names in the future.

I followed https://docs.confluent.io/kafka-connect-influxdb/current/influx-db-sink-connector/index.html (the "Schemaless JSON tags example") as this matches my use case quite closely. The "key" of each message is currently just the MQTT topic name (the topic's source is an MQTT connector). So I set the "key.converter" to "stringconverter" (instead of JSONconverter as in the example).

Other examples I've seen online seem to suggest the need for a schema to be set, which I'd like to avoid. Using InfluxDB v1.8, everything on Docker/maintained on Portainer.

I cannot seem to start the connector and never get any data to move across.

Below is the config for my InfluxDBSink Connector:

{
  "name": "InfluxDBSinkKafka",
  "config": {
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "name": "InfluxDBSinkKafka",
    "connector.class": "io.confluent.influxdb.InfluxDBSinkConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "topics": "KAFKATOPIC1",
    "influxdb.url": "http://URL:PORT",
    "influxdb.db": "tagdata",
    "measurement.name.format": "${topic}"
  }
}

The connector fails, and each time I click "start" (the play button) the following pops up in the connect container's logs:

[2022-03-22 15:46:52,562] INFO [Worker clientId=connect-1, groupId=compose-connect-group]
Connector InfluxDBSinkKafka target state change (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2022-03-22 15:46:52,562] INFO Setting connector InfluxDBSinkKafka state to STARTED (org.apache.kafka.connect.runtime.Worker)
[2022-03-22 15:46:52,562] INFO SinkConnectorConfig values: 
    config.action.reload = restart
    connector.class = io.confluent.influxdb.InfluxDBSinkConnector
    errors.deadletterqueue.context.headers.enable = false
    errors.deadletterqueue.topic.name = 
    errors.deadletterqueue.topic.replication.factor = 3
    errors.log.enable = false
    errors.log.include.messages = false
    errors.retry.delay.max.ms = 60000
    errors.retry.timeout = 0
    errors.tolerance = none
    header.converter = null
    key.converter = class org.apache.kafka.connect.storage.StringConverter
    name = InfluxDBSinkKafka
    predicates = []
    tasks.max = 1
    topics = [KAFKATOPIC1]
    topics.regex = 
    transforms = []
    value.converter = class org.apache.kafka.connect.json.JsonConverter
 (org.apache.kafka.connect.runtime.SinkConnectorConfig)
[2022-03-22 15:46:52,563] INFO EnrichedConnectorConfig values: 
    config.action.reload = restart
    connector.class = io.confluent.influxdb.InfluxDBSinkConnector
    errors.deadletterqueue.context.headers.enable = false
    errors.deadletterqueue.topic.name = 
    errors.deadletterqueue.topic.replication.factor = 3
    errors.log.enable = false
    errors.log.include.messages = false
    errors.retry.delay.max.ms = 60000
    errors.retry.timeout = 0
    errors.tolerance = none
    header.converter = null
    key.converter = class org.apache.kafka.connect.storage.StringConverter
    name = InfluxDBSinkKafka
    predicates = []
    tasks.max = 1
    topics = [KAFKATOPIC1]
    topics.regex = 
    transforms = []
    value.converter = class org.apache.kafka.connect.json.JsonConverter
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig)

I am feeling a little out of my depth and would appreciate any and all help.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

南笙 2025-01-23 14:09:56

这里的技巧是首先以正确的格式将数据发送到 Kafka。我的 MQTT 源流需要将值转换器设置为 Bytearray,其中 schema url 且 schema = true。然后,当我使用 jsonconverter(schema=false)时,Influx Sink 开始工作。然后它开始工作了。这是具有欺骗性的,因为对于 MQTT 源连接器,消息队列看起来与不同的值转换器相同,因此需要一段时间才能找出问题所在。

在完成这项工作后,我意识到汇合堆栈对于这项任务来说可能有点大材小用,我选择了(更)简单的路线,将 MQTT 直接推送到 Telegraf 并让 Telegraf 推送到 InfluxDB。我会推荐这个。

The trick here is getting the data in the right format to Kafka in the first place. My MQTT source stream needed to have the value converter set to Bytearray with e schema url and schema = true. Then the Influx Sink started working when I used the jsonconverter, with schema=false. Then it started working. This is deceptive because the message queue looks the same with different valueconverters for the MQTT source connecter, so it took a while to figure out that was the problem.

After getting this working, and realising the confluent stack was perhaps a little overkill for this task, I went with the (much) easier route of pushing MQTT directly to Telegraf and having Telegraf push into InfluxDB. I would recommend this.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文