Kafka Connect-由于意外对象类型而没有存在转换器:java.lang.double

发布于 2025-02-04 16:47:35 字数 4684 浏览 5 评论 0原文

我有一个Kafka流应用程序,该应用程序根据主题的值,唱歌窗口进行一些计算。我读到,持续数据的最佳实践是将值推向另一个主题,并使用Kafka Connect从主题获取数据并将其保存在数据库中。

我下载了Confluent/Kafka-Connect图像,并通过使用DockerFile安装Mongodb-Kafka-Connector进行了扩展。

FROM confluentinc/cp-kafka-connect:7.0.3
COPY target/components/packages/mongodb-kafka-connect-mongodb-1.7.0.zip /tmp/mongodb-kafka-connect-mongodb-1.7.0.zip

RUN confluent-hub install --no-prompt /tmp/mongodb-kafka-connect-mongodb-1.7.0.zip

我将带有配置的请求发送到Kafka Connect:

 curl -X PUT http://localhost:8083/connectors/sink-mongodb-users/config -H "Content-Type: application/json" -d ' {
    "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
    "tasks.max":"1",
    "topics":"MOVINGAVG",
    "connection.uri":"mongodb://mongo:27017",
    "database":"Temperature",
    "collection":"MovingAverage",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable":true,
    "value.converter":"org.apache.kafka.connect.converters.DoubleConverter",
    "value.converter.schemas.enable":true
    }'

Kafka Streams应用程序生成记录:

StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream<String, Double> kafkaStreams = streamsBuilder.stream(topic, Consumed.with(Serdes.String(), Serdes.Double()));
        Duration timeDifference = Duration.ofSeconds(30);

        KTable table = kafkaStreams.groupByKey()
                .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(timeDifference))
                .aggregate(
                        () -> generateTuple(logger), // initializer
                        (key, value, aggregate) -> tempAggregator(key, value,aggregate, logger))
                .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
                .mapValues((ValueMapper<AggregationClass, Object>) tuple2 -> tuple2.getAverage());
        table.toStream().peek((k,v) -> System.out.println("Value:" + v)).to(targetTopic, Produced.valueSerde(Serdes.Double()));
        KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), properties);
        streams.cleanUp();
        streams.start();

消息发送给主题,Mongo Connector正在阅读它们,但是抛出了一个例外:

Caused by: org.apache.kafka.connect.errors.DataException: Could not convert value `2.1474836E7` into a BsonDocument.

at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:169)

at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.containsKey(LazyBsonDocument.java:83)

at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.shouldAppend(DocumentIdAdder.java:68)

at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.lambda$process$0(DocumentIdAdder.java:51)

at java.base/java.util.Optional.ifPresent(Optional.java:183)

at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.process(DocumentIdAdder.java:49)

at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$1(MongoProcessedSinkRecordData.java:90)

at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)

at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085)

at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$2(MongoProcessedSinkRecordData.java:90)

at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)

at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModel(MongoProcessedSinkRecordData.java:85)

at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)

at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)

at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)

at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:75)

at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)

at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)

... 10 more

Caused by: org.apache.kafka.connect.errors.DataException: No converter present due to unexpected object type: java.lang.Double

at com.mongodb.kafka.connect.sink.converter.SinkConverter.getRecordConverter(SinkConverter.java:92)

at com.mongodb.kafka.connect.sink.converter.SinkConverter.lambda$convert$1(SinkConverter.java:60)

at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:166)

... 27 more

为什么会抛出这样的例外?

I have a Kafka Streams application, which does some calculations based on values from topic, sing sliding windows. I read, that the best practice for persisting the data would be to push the values to another topic, and use Kafka Connect to get data from topic and save it in the database.

I downloaded the confluent/kafka-connect image and extended it by installing mongodb-kafka-connector using the dockerfile.

FROM confluentinc/cp-kafka-connect:7.0.3
COPY target/components/packages/mongodb-kafka-connect-mongodb-1.7.0.zip /tmp/mongodb-kafka-connect-mongodb-1.7.0.zip

RUN confluent-hub install --no-prompt /tmp/mongodb-kafka-connect-mongodb-1.7.0.zip

I sent a request with config to the kafka connect:

 curl -X PUT http://localhost:8083/connectors/sink-mongodb-users/config -H "Content-Type: application/json" -d ' {
    "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
    "tasks.max":"1",
    "topics":"MOVINGAVG",
    "connection.uri":"mongodb://mongo:27017",
    "database":"Temperature",
    "collection":"MovingAverage",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable":true,
    "value.converter":"org.apache.kafka.connect.converters.DoubleConverter",
    "value.converter.schemas.enable":true
    }'

The Kafka Streams app producing records:

StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream<String, Double> kafkaStreams = streamsBuilder.stream(topic, Consumed.with(Serdes.String(), Serdes.Double()));
        Duration timeDifference = Duration.ofSeconds(30);

        KTable table = kafkaStreams.groupByKey()
                .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(timeDifference))
                .aggregate(
                        () -> generateTuple(logger), // initializer
                        (key, value, aggregate) -> tempAggregator(key, value,aggregate, logger))
                .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
                .mapValues((ValueMapper<AggregationClass, Object>) tuple2 -> tuple2.getAverage());
        table.toStream().peek((k,v) -> System.out.println("Value:" + v)).to(targetTopic, Produced.valueSerde(Serdes.Double()));
        KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), properties);
        streams.cleanUp();
        streams.start();

The messages are being put to the topic, and mongo connector is reading them, however an exception is thrown:

Caused by: org.apache.kafka.connect.errors.DataException: Could not convert value `2.1474836E7` into a BsonDocument.

at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:169)

at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.containsKey(LazyBsonDocument.java:83)

at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.shouldAppend(DocumentIdAdder.java:68)

at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.lambda$process$0(DocumentIdAdder.java:51)

at java.base/java.util.Optional.ifPresent(Optional.java:183)

at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.process(DocumentIdAdder.java:49)

at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$1(MongoProcessedSinkRecordData.java:90)

at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)

at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085)

at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$2(MongoProcessedSinkRecordData.java:90)

at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)

at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModel(MongoProcessedSinkRecordData.java:85)

at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)

at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)

at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)

at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:75)

at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)

at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)

... 10 more

Caused by: org.apache.kafka.connect.errors.DataException: No converter present due to unexpected object type: java.lang.Double

at com.mongodb.kafka.connect.sink.converter.SinkConverter.getRecordConverter(SinkConverter.java:92)

at com.mongodb.kafka.connect.sink.converter.SinkConverter.lambda$convert$1(SinkConverter.java:60)

at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:166)

... 27 more

Why such exception would be thrown?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文