kafka connect protobuf转换器找不到
我有一个Kafka S3-sink连接器,我正在生产消息。但是,当我将消息发送到主题时,我注意到我的水槽连接器的日志给我带来了一个错误:
│ Invalid value io.confluent.connect.protobuf.ProtobufConverter for configuration value.converter: Class io.confluent.connect.protobuf.ProtobufConverter could not be found.
这是我的水槽:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: test-s3-sink
labels:
# The strimzi.io/cluster label identifies the KafkaConnect instance
# in which to create this connector. That KafkaConnect instance
# must have the strimzi.io/use-connector-resources annotation
# set to true.
strimzi.io/cluster: test-cluster
spec:
class: io.confluent.connect.s3.S3SinkConnector
tasksMax: 6
config:
name: test-s3-sink
topics: testing-stack-overflow-topic
flush.size: 100
s3.bucket.name: test-bucket
s3.region: us-east-1
storage.class: io.confluent.connect.s3.storage.S3Storage
format.class: io.confluent.connect.s3.format.avro.AvroFormat
store.kafka.keys: true
keys.format.class: io.confluent.connect.s3.format.avro.AvroFormat
key.converter: "io.confluent.connect.protobuf.ProtobufConverter"
key.converter.schema.registry.url: "http://localhost:8081"
key.converter.schemas.enable: true
value.converter: "io.confluent.connect.protobuf.ProtobufConverter"
value.converter.schema.registry.url: "http://localhost:8081"
value.converter.schemas.enable: true
rotate.interval.ms: 3600000
timezone: UTC
behavior.on.null.values: ignore
partitioner.class: "io.confluent.connect.storage.partitioner.HourlyPartitioner"
locale: "en-US"
这是我的制片人:
package kafka;
import com.github.javafaker.Faker;
import java.util.Random;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import com.example.CardData;
import java.util.Properties;
public class SendKafkaProto {
public static void main(String[] args) {
// Setup Producer Properties
String bootstrapServers = "127.0.0.1:9092";
var properties = new Properties();
properties.setProperty("bootstrap.servers", bootstrapServers);
properties.setProperty("schema.registry.url", "http://localhost:8081");
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", KafkaProtobufSerializer.class.getName());
KafkaProducer<String, CardData.CreditCard> producer = new KafkaProducer<>(properties);
// Specify Topic Name
var topic = "protos_topic_cards";
// Loop to Produce Fake Data
for (int i = 0; i < 15; i++) {
// creating Random object
Random rd = new Random();
Faker faker = new Faker();
String name = faker.name().fullName();
String countryCode = faker.address().countryCode();
String cardNumber = faker.business().creditCardNumber();
Integer typeValue = rd.nextInt(3);
String currencyCode = faker.country().currencyCode();
// Serializing to Protobuf based on CreditCard.proto Schema
var cardData = CardData.CreditCard.newBuilder()
.setName(name)
.setCountry(countryCode)
.setCurrency(currencyCode)
.setTypeValue(typeValue)
.setBlocked(false)
.setCardNumber(cardNumber)
.build();
var record = new ProducerRecord<String, CardData.CreditCard>(topic, "Credit Card", cardData);
// Send to Producer
producer.send(record);
}
producer.flush();
producer.close();
// Log success message
System.out.println("Sent Data Successfully");
}
}
这里可能不对?我需要将Protobuf转换器添加到安装Kafka图像的任何地方吗?我需要将字符串转换器用于关键序列化器吗?在线上没有很多解决方案。谢谢!
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
strimzi容器不包含Apache Kafka包含的转换器。
您需要安装它,有可能需要自己的docker映像 -
,您也不能将
localhost
用作strimzi config的注册表url;它需要是连接容器环境之外的外部地址,例如http://schema-registry.svc.cluster.local.local:8081
如果您有schema-registry
在同一名称空间中提供服务。并且您的生产者使用字符串键,因此
key.converter
不应是ProtoBuf;字符串没有图式,因此没有什么可实现的。 Protobuf始终具有模式,因此具有schemas.enable
无能为力。如果您想将AVRO存储在S3中,则使用AVRO生产商可能会更有意义。同样,安装必要的转换器。
The Strimzi containers don't contain any converters outside the ones included by Apache Kafka.
You need to install it, potentially requiring your own Docker image - https://www.confluent.io/hub/confluentinc/kafka-connect-protobuf-converter
Also, you cannot use
localhost
as the Registry URL from the Strimzi config; it needs to be an external address outside the Connect container environment, e.g.http://schema-registry.svc.cluster.local:8081
if you had aschema-registry
service in the same namespace.And your producer uses String keys, so
key.converter
shouldn't be Protobuf ; Strings don't have schemas, so there is nothing to enable. And Protobuf always has a schema, so havingschemas.enable
doesn't do anything.If you want to store Avro in S3, using an Avro Producer might make more sense. Again, install the necessary Converter.
看起来像
io.confluent.connect.protobuf.protobufconverter
在您的应用程序类中不存在。看看您要执行的罐子,以及它们是否提供它,并确保它存在于您的路径中。Looks like
io.confluent.connect.protobuf.ProtobufConverter
is not present in your application classpath. Take a look across the JARs that you are executing and whether any of them delivers it and make sure it is present in your path.