kafka流avro消费者无法检索ID 10007的Avro模式
我正在运行一个试图从Confluent Cloud消费AVRO记录的Kafka流消费者。我一直遇到错误:错误检索ID 100007的AVRO未知模式
和未经授权;错误代码:401
。我有一个 streams.properties
src/main/resources/
中的文件。
这是我的错误消息:
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro unknown schema for id 100007
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:259)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:341)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:113)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:66)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:38)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:303)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:960)
at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1068)
at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:962)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:751)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576)
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:367)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:836)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:809)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:277)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:409)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:330)
... 16 more
这是Kafka流代码,
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import com.kinsaleins.avro.POCEntity;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
public class streams {
public static void main(String[] args) throws IOException {
StreamsBuilder builder = new StreamsBuilder();
Properties properties = new Properties();
InputStream in = streams.class.getClassLoader().getResourceAsStream("streams.properties");
properties.load(in);
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-app");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "pkc-2396y.us-east-1.aws.confluent.cloud:9092");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
final String inputTopic = properties.getProperty("producer.send.topic");
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
"https://psrc-4xgzx.us-east-2.aws.confluent.cloud");
final Serde<String> stringSerde = Serdes.String();
final Serde<POCEntity> valueAvroSerde = new SpecificAvroSerde<>();
valueAvroSerde.configure(serdeConfig, false);
KStream<String, POCEntity> firstStream = builder.stream(inputTopic, Consumed.with(stringSerde, valueAvroSerde));
firstStream.peek((key, value) -> System.out.println("key " +key +"value " + value));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
kafkaStreams.start();
}
}
我不明白我在做什么。我遵循 https://docs.confluent.io/cloud/curroud/curd/cpreant/cp-component/streams-clouds-cloud-config.html , https://www.youtube.com/watch?v=lxxxexi1mpko , ,and 。
在这里完全茫然。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
正如错误所说的那样,
未经授权
。您没有为Avro Serde配置提供身份验证设置。
注意” 。您给出的其余链接似乎是“本地开发 /入门”,并且不
同样涵盖安全配置,您需要
properties < / code>变量中的SASL属性才能连接到实际经纪人,假设这不是
streams.properties
文件...汇合云需要身份验证,该设置的值应在群集仪表板中显示。
如果没有身份验证,任何人都可以在您的问题中复制代码并开始发送/消费随机数据;)
As the error says,
Unauthorized
.You have given no authentication settings to your avro serde config.
Notice from docs -
basic.auth.credentials.source
+schema.registry.basic.auth.user.info
. The rest of the links you've given seem to be "local development / getting started" and don't cover security configurationsSimilarly, you need SASL properties in your
properties
variable to connect to the actual broker, assuming that is not part ofstreams.properties
file...Confluent Cloud requires authentication, and the values for that setting should be shown in your cluster dashboard.
If there were no authentication, anyone would be able to copy the code in your question and start sending/consuming random data ;)