Flink Python DataStream API Kafka消费者-NoclassDeffoundError bytearrayDeserializer错误
我在pyflink的PY4J侧有一个错误。代码下面:
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:/" + os.getcwd() + "/jar_files/" + "flink-sql-connector-kafka-1.15.0.jar")
type_info = Types.ROW_NAMED(['id', 't', 'l', 't', 'm', "a", "e"],
[Types.STRING(), Types.LIST(Types.STRING()), Types.STRING(), Types.FLOAT(),
Types.STRING(), Types.FLOAT(),
Types.STRING()])
json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()
kafka_props = {'bootstrap.servers': 'localhost:9092'}
kafka_consumer = FlinkKafkaConsumer("test", json_row_schema, kafka_props)
kafka_consumer.set_start_from_earliest()
## Stream
type_info = Types.ROW([Types.STRING(), Types.PICKLED_BYTE_ARRAY()])
serialization_schema = JsonRowSerializationSchema.Builder() \
.with_type_info(type_info) \
.build()
kafka_producer = FlinkKafkaProducer(
topic='testmodels',
serialization_schema=serialization_schema,
producer_config={'bootstrap.servers': 'localhost:9092'}
)
错误是:
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.
: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:323)
..more..
我也有一个虚拟映射函数,在这里与之无关,因为
def _get_kafka_consumer(topics, properties, deserialization_schema, j_consumer_clz):
if not isinstance(topics, list):
topics = [topics]
gateway = get_gateway()
j_properties = gateway.jvm.java.util.Properties()
for key, value in properties.items():
j_properties.setProperty(key, value)
j_flink_kafka_consumer = j_consumer_clz(topics,
deserialization_schema._j_deserialization_schema,
j_properties)
return j_flink_kafka_consumer
我尝试的错误是:
- 添加
env.add_jars(“ file:/” + os.getCwd () +“/jar_files/” +“ kafka-clients-3.2.0.jar”)
to代码。 - 将另一个java_import添加为
java_import(gateway.jvm,“ org.apache.kafka.common.serialization。*”)
在`import_flink_view'方法中。 - 使用
flink-connector-kafka-1.15.0.jar
和kafka-clients-3.2.0.jar
一起。
他们没有解决这个问题。
I have an error on Py4j side of the PyFlink. Code is below:
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:/" + os.getcwd() + "/jar_files/" + "flink-sql-connector-kafka-1.15.0.jar")
type_info = Types.ROW_NAMED(['id', 't', 'l', 't', 'm', "a", "e"],
[Types.STRING(), Types.LIST(Types.STRING()), Types.STRING(), Types.FLOAT(),
Types.STRING(), Types.FLOAT(),
Types.STRING()])
json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()
kafka_props = {'bootstrap.servers': 'localhost:9092'}
kafka_consumer = FlinkKafkaConsumer("test", json_row_schema, kafka_props)
kafka_consumer.set_start_from_earliest()
## Stream
type_info = Types.ROW([Types.STRING(), Types.PICKLED_BYTE_ARRAY()])
serialization_schema = JsonRowSerializationSchema.Builder() \
.with_type_info(type_info) \
.build()
kafka_producer = FlinkKafkaProducer(
topic='testmodels',
serialization_schema=serialization_schema,
producer_config={'bootstrap.servers': 'localhost:9092'}
)
The error is:
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.
: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:323)
..more..
I also have a dummy map function, which is not relevant here because the error is coming from
def _get_kafka_consumer(topics, properties, deserialization_schema, j_consumer_clz):
if not isinstance(topics, list):
topics = [topics]
gateway = get_gateway()
j_properties = gateway.jvm.java.util.Properties()
for key, value in properties.items():
j_properties.setProperty(key, value)
j_flink_kafka_consumer = j_consumer_clz(topics,
deserialization_schema._j_deserialization_schema,
j_properties)
return j_flink_kafka_consumer
The thing I tried are:
- Adding
env.add_jars("file:/" + os.getcwd() + "/jar_files/" + "kafka-clients-3.2.0.jar")
to code. - Adding another java_import as
java_import(gateway.jvm, "org.apache.kafka.common.serialization.*")
in `import_flink_view' method. - Use
flink-connector-kafka-1.15.0.jar
andkafka-clients-3.2.0.jar
together.
They did not solve the issue.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
pyflink在
〜/miniconda3/envs/end< env_name>/lib/lt; pythonversion>/site_packages/pyflink/lib/code>下使用.jar文件。移动
flink-connector-kafka-1.15.0.jar
和kafka-clients-3.2.0.jar
在此文件夹下可能会解决您的问题。Pyflink uses .jar files under
~/miniconda3/envs/<env_name>/lib/<pythonversion>/site_packages/pyflink/lib/
while linking to jvm. Moving theflink-connector-kafka-1.15.0.jar
andkafka-clients-3.2.0.jar
under this folder may solve your issue.