Flink Python DataStream API Kafka消费者-NoclassDeffoundError bytearrayDeserializer错误

发布于 2025-02-12 14:50:31 字数 2526 浏览 0 评论 0原文

我在pyflink的PY4J侧有一个错误。代码下面:

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:/" + os.getcwd() + "/jar_files/" + "flink-sql-connector-kafka-1.15.0.jar")
type_info = Types.ROW_NAMED(['id', 't', 'l', 't', 'm', "a", "e"],
                                [Types.STRING(), Types.LIST(Types.STRING()), Types.STRING(), Types.FLOAT(),
                                 Types.STRING(), Types.FLOAT(),
                                 Types.STRING()])
json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()
kafka_props = {'bootstrap.servers': 'localhost:9092'}
kafka_consumer = FlinkKafkaConsumer("test", json_row_schema, kafka_props)
kafka_consumer.set_start_from_earliest()
## Stream
type_info = Types.ROW([Types.STRING(), Types.PICKLED_BYTE_ARRAY()])
serialization_schema = JsonRowSerializationSchema.Builder() \
        .with_type_info(type_info) \
        .build()
kafka_producer = FlinkKafkaProducer(
        topic='testmodels',
        serialization_schema=serialization_schema,
        producer_config={'bootstrap.servers': 'localhost:9092'}
    )

错误是:

py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.
: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:323)
..more..

我也有一个虚拟映射函数,在这里与之无关,因为

def _get_kafka_consumer(topics, properties, deserialization_schema, j_consumer_clz):
    if not isinstance(topics, list):
        topics = [topics]
    gateway = get_gateway()
    j_properties = gateway.jvm.java.util.Properties()
    for key, value in properties.items():
        j_properties.setProperty(key, value)

    j_flink_kafka_consumer = j_consumer_clz(topics,
                                            deserialization_schema._j_deserialization_schema,
                                            j_properties)
    return j_flink_kafka_consumer

我尝试的错误是:

  • 添加env.add_jars(“ file:/” + os.getCwd () +“/jar_files/” +“ kafka-clients-3.2.0.jar”) to代码。
  • 将另一个java_import添加为java_import(gateway.jvm,“ org.apache.kafka.common.serialization。*”)在`import_flink_view'方法中。
  • 使用flink-connector-kafka-1.15.0.jarkafka-clients-3.2.0.jar一起。

他们没有解决这个问题。

I have an error on Py4j side of the PyFlink. Code is below:

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:/" + os.getcwd() + "/jar_files/" + "flink-sql-connector-kafka-1.15.0.jar")
type_info = Types.ROW_NAMED(['id', 't', 'l', 't', 'm', "a", "e"],
                                [Types.STRING(), Types.LIST(Types.STRING()), Types.STRING(), Types.FLOAT(),
                                 Types.STRING(), Types.FLOAT(),
                                 Types.STRING()])
json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()
kafka_props = {'bootstrap.servers': 'localhost:9092'}
kafka_consumer = FlinkKafkaConsumer("test", json_row_schema, kafka_props)
kafka_consumer.set_start_from_earliest()
## Stream
type_info = Types.ROW([Types.STRING(), Types.PICKLED_BYTE_ARRAY()])
serialization_schema = JsonRowSerializationSchema.Builder() \
        .with_type_info(type_info) \
        .build()
kafka_producer = FlinkKafkaProducer(
        topic='testmodels',
        serialization_schema=serialization_schema,
        producer_config={'bootstrap.servers': 'localhost:9092'}
    )

The error is:

py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.
: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:323)
..more..

I also have a dummy map function, which is not relevant here because the error is coming from

def _get_kafka_consumer(topics, properties, deserialization_schema, j_consumer_clz):
    if not isinstance(topics, list):
        topics = [topics]
    gateway = get_gateway()
    j_properties = gateway.jvm.java.util.Properties()
    for key, value in properties.items():
        j_properties.setProperty(key, value)

    j_flink_kafka_consumer = j_consumer_clz(topics,
                                            deserialization_schema._j_deserialization_schema,
                                            j_properties)
    return j_flink_kafka_consumer

The thing I tried are:

  • Adding env.add_jars("file:/" + os.getcwd() + "/jar_files/" + "kafka-clients-3.2.0.jar") to code.
  • Adding another java_import as java_import(gateway.jvm, "org.apache.kafka.common.serialization.*") in `import_flink_view' method.
  • Use flink-connector-kafka-1.15.0.jar and kafka-clients-3.2.0.jar together.

They did not solve the issue.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

ㄟ。诗瑗 2025-02-19 14:50:31

pyflink在〜/miniconda3/envs/end< env_name>/lib/lt; pythonversion>/site_packages/pyflink/lib/code>下使用.jar文件。移动flink-connector-kafka-1.15.0.jarkafka-clients-3.2.0.jar在此文件夹下可能会解决您的问题。

Pyflink uses .jar files under ~/miniconda3/envs/<env_name>/lib/<pythonversion>/site_packages/pyflink/lib/ while linking to jvm. Moving the flink-connector-kafka-1.15.0.jar and kafka-clients-3.2.0.jar under this folder may solve your issue.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文