kafka connect protobuf转换器找不到

发布于 2025-02-11 12:08:25 字数 3908 浏览 2 评论 0 原文

我有一个Kafka S3-sink连接器,我正在生产消息。但是,当我将消息发送到主题时,我注意到我的水槽连接器的日志给我带来了一个错误:

│ Invalid value io.confluent.connect.protobuf.ProtobufConverter for configuration value.converter: Class io.confluent.connect.protobuf.ProtobufConverter could not be found.  

这是我的水槽:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: test-s3-sink
  labels:
    # The strimzi.io/cluster label identifies the KafkaConnect instance
    # in which to create this connector. That KafkaConnect instance
    # must have the strimzi.io/use-connector-resources annotation
    # set to true.
    strimzi.io/cluster: test-cluster
spec:
  class: io.confluent.connect.s3.S3SinkConnector
  tasksMax: 6
  config:
    name: test-s3-sink
    topics: testing-stack-overflow-topic
    flush.size: 100
    s3.bucket.name: test-bucket
    s3.region: us-east-1
    storage.class: io.confluent.connect.s3.storage.S3Storage
    format.class: io.confluent.connect.s3.format.avro.AvroFormat
    store.kafka.keys: true
    keys.format.class: io.confluent.connect.s3.format.avro.AvroFormat
    key.converter: "io.confluent.connect.protobuf.ProtobufConverter"
    key.converter.schema.registry.url: "http://localhost:8081"
    key.converter.schemas.enable: true
    value.converter: "io.confluent.connect.protobuf.ProtobufConverter"
    value.converter.schema.registry.url: "http://localhost:8081"
    value.converter.schemas.enable: true
    rotate.interval.ms: 3600000
    timezone: UTC
    behavior.on.null.values: ignore
    partitioner.class: "io.confluent.connect.storage.partitioner.HourlyPartitioner"
    locale: "en-US"

这是我的制片人:

package kafka;

import com.github.javafaker.Faker;
import java.util.Random;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import com.example.CardData;
import java.util.Properties;

public class SendKafkaProto {
  public static void main(String[] args) {
    // Setup Producer Properties
    String bootstrapServers = "127.0.0.1:9092";
    var properties = new Properties();
    properties.setProperty("bootstrap.servers", bootstrapServers);
    properties.setProperty("schema.registry.url", "http://localhost:8081");
    properties.setProperty("key.serializer", StringSerializer.class.getName());
    properties.setProperty("value.serializer", KafkaProtobufSerializer.class.getName());

    KafkaProducer<String, CardData.CreditCard> producer = new KafkaProducer<>(properties);
    // Specify Topic Name
    var topic = "protos_topic_cards";

    // Loop to Produce Fake Data
    for (int i = 0; i < 15; i++) {
      // creating Random object
      Random rd = new Random();
      Faker faker = new Faker();
      String name = faker.name().fullName();
      String countryCode = faker.address().countryCode();
      String cardNumber = faker.business().creditCardNumber();
      Integer typeValue = rd.nextInt(3);
      String currencyCode = faker.country().currencyCode();

      // Serializing to Protobuf based on CreditCard.proto Schema
      var cardData = CardData.CreditCard.newBuilder()
          .setName(name)
          .setCountry(countryCode)
          .setCurrency(currencyCode)
          .setTypeValue(typeValue)
          .setBlocked(false)
          .setCardNumber(cardNumber)
          .build();

      var record = new ProducerRecord<String, CardData.CreditCard>(topic, "Credit Card", cardData);
      // Send to Producer
      producer.send(record);
    }
    producer.flush();
    producer.close();
    // Log success message
    System.out.println("Sent Data Successfully");
  }
}

这里可能不对?我需要将Protobuf转换器添加到安装Kafka图像的任何地方吗?我需要将字符串转换器用于关键序列化器吗?在线上没有很多解决方案。谢谢!

I have a kafka s3-sink connector which I'm producing messages to. However when I send messages to the topic, I'm noticied the logs of my sink connector are throwing me this error:

│ Invalid value io.confluent.connect.protobuf.ProtobufConverter for configuration value.converter: Class io.confluent.connect.protobuf.ProtobufConverter could not be found.  

Here's my sink:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: test-s3-sink
  labels:
    # The strimzi.io/cluster label identifies the KafkaConnect instance
    # in which to create this connector. That KafkaConnect instance
    # must have the strimzi.io/use-connector-resources annotation
    # set to true.
    strimzi.io/cluster: test-cluster
spec:
  class: io.confluent.connect.s3.S3SinkConnector
  tasksMax: 6
  config:
    name: test-s3-sink
    topics: testing-stack-overflow-topic
    flush.size: 100
    s3.bucket.name: test-bucket
    s3.region: us-east-1
    storage.class: io.confluent.connect.s3.storage.S3Storage
    format.class: io.confluent.connect.s3.format.avro.AvroFormat
    store.kafka.keys: true
    keys.format.class: io.confluent.connect.s3.format.avro.AvroFormat
    key.converter: "io.confluent.connect.protobuf.ProtobufConverter"
    key.converter.schema.registry.url: "http://localhost:8081"
    key.converter.schemas.enable: true
    value.converter: "io.confluent.connect.protobuf.ProtobufConverter"
    value.converter.schema.registry.url: "http://localhost:8081"
    value.converter.schemas.enable: true
    rotate.interval.ms: 3600000
    timezone: UTC
    behavior.on.null.values: ignore
    partitioner.class: "io.confluent.connect.storage.partitioner.HourlyPartitioner"
    locale: "en-US"

Here's my producer:

package kafka;

import com.github.javafaker.Faker;
import java.util.Random;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import com.example.CardData;
import java.util.Properties;

public class SendKafkaProto {
  public static void main(String[] args) {
    // Setup Producer Properties
    String bootstrapServers = "127.0.0.1:9092";
    var properties = new Properties();
    properties.setProperty("bootstrap.servers", bootstrapServers);
    properties.setProperty("schema.registry.url", "http://localhost:8081");
    properties.setProperty("key.serializer", StringSerializer.class.getName());
    properties.setProperty("value.serializer", KafkaProtobufSerializer.class.getName());

    KafkaProducer<String, CardData.CreditCard> producer = new KafkaProducer<>(properties);
    // Specify Topic Name
    var topic = "protos_topic_cards";

    // Loop to Produce Fake Data
    for (int i = 0; i < 15; i++) {
      // creating Random object
      Random rd = new Random();
      Faker faker = new Faker();
      String name = faker.name().fullName();
      String countryCode = faker.address().countryCode();
      String cardNumber = faker.business().creditCardNumber();
      Integer typeValue = rd.nextInt(3);
      String currencyCode = faker.country().currencyCode();

      // Serializing to Protobuf based on CreditCard.proto Schema
      var cardData = CardData.CreditCard.newBuilder()
          .setName(name)
          .setCountry(countryCode)
          .setCurrency(currencyCode)
          .setTypeValue(typeValue)
          .setBlocked(false)
          .setCardNumber(cardNumber)
          .build();

      var record = new ProducerRecord<String, CardData.CreditCard>(topic, "Credit Card", cardData);
      // Send to Producer
      producer.send(record);
    }
    producer.flush();
    producer.close();
    // Log success message
    System.out.println("Sent Data Successfully");
  }
}

What could amiss here? Do I need to add the Protobuf converter to wherever my kafka image is mounted? Do I need to use a string converter for the key serializer? There aren't many solutions here online. Thanks!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

愛上了 2025-02-18 12:08:25

strimzi容器不包含Apache Kafka包含的转换器。

您需要安装它,有可能需要自己的docker映像 -

,您也不能将 localhost 用作strimzi config的注册表url;它需要是连接容器环境之外的外部地址,例如 http://schema-registry.svc.cluster.local.local:8081 如果您有 schema-registry 在同一名称空间中提供服务。

并且您的生产者使用字符串键,因此 key.converter 不应是ProtoBuf;字符串没有图式,因此没有什么可实现的。 Protobuf始终具有模式,因此具有 schemas.enable 无能为力。

如果您想将AVRO存储在S3中,则使用AVRO生产商可能会更有意义。同样,安装必要的转换器。

The Strimzi containers don't contain any converters outside the ones included by Apache Kafka.

You need to install it, potentially requiring your own Docker image - https://www.confluent.io/hub/confluentinc/kafka-connect-protobuf-converter

Also, you cannot use localhost as the Registry URL from the Strimzi config; it needs to be an external address outside the Connect container environment, e.g. http://schema-registry.svc.cluster.local:8081 if you had a schema-registry service in the same namespace.

And your producer uses String keys, so key.converter shouldn't be Protobuf ; Strings don't have schemas, so there is nothing to enable. And Protobuf always has a schema, so having schemas.enable doesn't do anything.

If you want to store Avro in S3, using an Avro Producer might make more sense. Again, install the necessary Converter.

残花月 2025-02-18 12:08:25

看起来像 io.confluent.connect.protobuf.protobufconverter 在您的应用程序类中不存在。看看您要执行的罐子,以及它们是否提供它,并确保它存在于您的路径中。

Looks like io.confluent.connect.protobuf.ProtobufConverter is not present in your application classpath. Take a look across the JARs that you are executing and whether any of them delivers it and make sure it is present in your path.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文