kafka流avro消费者无法检索ID 10007的Avro模式

发布于 2025-02-11 21:07:44 字数 6299 浏览 3 评论 0 原文

我正在运行一个试图从Confluent Cloud消费AVRO记录的Kafka流消费者。我一直遇到错误:错误检索ID 100007的AVRO未知模式未经授权;错误代码:401 。我有一个 streams.properties src/main/resources/中的文件。

这是我的错误消息:

org.apache.kafka.common.errors.SerializationException: Error retrieving Avro unknown schema for id 100007
    at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:259)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:341)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:113)
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:66)
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:38)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:303)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:960)
    at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1068)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:962)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:751)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576)
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:367)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:836)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:809)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:277)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:409)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:330)
    ... 16 more

这是Kafka流代码,

import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;

import com.kinsaleins.avro.POCEntity;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;


public class streams {

    public static void main(String[] args) throws IOException {

        StreamsBuilder builder = new StreamsBuilder();

        Properties properties = new Properties();

        InputStream in = streams.class.getClassLoader().getResourceAsStream("streams.properties");
        properties.load(in);

        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-app");
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "pkc-2396y.us-east-1.aws.confluent.cloud:9092");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);

        final String inputTopic = properties.getProperty("producer.send.topic");

        final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
            "https://psrc-4xgzx.us-east-2.aws.confluent.cloud");

        final Serde<String> stringSerde = Serdes.String();
        final Serde<POCEntity> valueAvroSerde = new SpecificAvroSerde<>();
        valueAvroSerde.configure(serdeConfig, false);


        KStream<String, POCEntity> firstStream = builder.stream(inputTopic, Consumed.with(stringSerde, valueAvroSerde));
        firstStream.peek((key, value) -> System.out.println("key " +key +"value " + value));

        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
        kafkaStreams.start();

    }

}

我不明白我在做什么。我遵循 https://docs.confluent.io/cloud/curroud/curd/cpreant/cp-component/streams-clouds-cloud-config.html https://www.youtube.com/watch?v=lxxxexi1mpko ,and

在这里完全茫然。

I am running a Kafka Streams Consumer that is trying to consume Avro records from Confluent Cloud. I keep getting the error: Error retrieving Avro unknown schema for id 100007 and unauthorized; error code: 401. I have a streams.properties file in src/main/resources/.

Here is my error message:

org.apache.kafka.common.errors.SerializationException: Error retrieving Avro unknown schema for id 100007
    at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:259)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:341)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:113)
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:66)
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:38)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:303)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:960)
    at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1068)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:962)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:751)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576)
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:367)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:836)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:809)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:277)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:409)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:330)
    ... 16 more

And here is the Kafka Streams code

import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;

import com.kinsaleins.avro.POCEntity;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;


public class streams {

    public static void main(String[] args) throws IOException {

        StreamsBuilder builder = new StreamsBuilder();

        Properties properties = new Properties();

        InputStream in = streams.class.getClassLoader().getResourceAsStream("streams.properties");
        properties.load(in);

        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-app");
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "pkc-2396y.us-east-1.aws.confluent.cloud:9092");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);

        final String inputTopic = properties.getProperty("producer.send.topic");

        final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
            "https://psrc-4xgzx.us-east-2.aws.confluent.cloud");

        final Serde<String> stringSerde = Serdes.String();
        final Serde<POCEntity> valueAvroSerde = new SpecificAvroSerde<>();
        valueAvroSerde.configure(serdeConfig, false);


        KStream<String, POCEntity> firstStream = builder.stream(inputTopic, Consumed.with(stringSerde, valueAvroSerde));
        firstStream.peek((key, value) -> System.out.println("key " +key +"value " + value));

        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
        kafkaStreams.start();

    }

}

I don't understand what I am doing wrong. I have followed instructions from https://docs.confluent.io/platform/current/streams/developer-guide/datatypes.html#avro, https://docs.confluent.io/cloud/current/cp-component/streams-cloud-config.html, https://www.youtube.com/watch?v=LxxeXI1mPKo,
https://www.youtube.com/watch?v=DOBMB0L0oKQ&list=PLa7VYi0yPIH35IrbJ7Y0U2YLrR9u4QO-s&index=4, and
https://github.com/confluentinc/kafka-streams-examples/tree/7.1.1-post/src/main/java/io/confluent/examples/streams (Looked at the avro examples for guidance).

Completely at a loss here.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

傾旎 2025-02-18 21:07:44

正如错误所说的那样,未经授权

您没有为Avro Serde配置提供身份验证设置。

注意” 。您给出的其余链接似乎是“本地开发 /入门”,并且不

同样涵盖安全配置,您需要 properties < / code>变量中的SASL属性才能连接到实际经纪人,假设这不是 streams.properties 文件...

汇合云需要身份验证,该设置的值应在群集仪表板中显示。

如果没有身份验证,任何人都可以在您的问题中复制代码并开始发送/消费随机数据;)

As the error says, Unauthorized.

You have given no authentication settings to your avro serde config.

Notice from docs - basic.auth.credentials.source + schema.registry.basic.auth.user.info. The rest of the links you've given seem to be "local development / getting started" and don't cover security configurations

Similarly, you need SASL properties in your properties variable to connect to the actual broker, assuming that is not part of streams.properties file...

Confluent Cloud requires authentication, and the values for that setting should be shown in your cluster dashboard.

If there were no authentication, anyone would be able to copy the code in your question and start sending/consuming random data ;)

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文