PySpark -Streaming- java.lang.ClassNotFoundException:org.apache.kafka.common.serialization.ByteArraySerializer
我正在 kubernetes
环境中提交 PySpark Streaming 作业。该作业使用来自 kafka 的数据并使用 pyspark 对其进行处理。
火花版本:3.2.1, Apache Kafka 版本:2.4
我使用以下 spark-submit
命令提交:
/opt/spark/bin/spark-submit \
--master k8s://https://test.containers.cloud.ibm.com:35000 \
--deploy-mode cluster \
--name spark-streaming-su \
--conf spark.kubernetes.driver.pod.name=spark-streaming-driver-su \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.kubernetes.namespace=test-spark \
--conf spark.kubernetes.file.upload.path=/code-volume/upload_path \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.authenticate.submission.caCertFile=/etc/spark-secret/ca.crt \
--conf spark.kubernetes.authenticate.submission.oauthTokenFile=/etc/spark-secret/token \
--conf spark.kubernetes.driver.limit.cores=2 \
--conf spark.driver.memory=2g \
--conf spark.sql.shuffle.partitions=4 \
--conf spark.executor.instances=2 \
--conf spark.executor.memory=2g \
--conf spark.kubernetes.executor.limit.cores=1 \
--conf spark.kubernetes.pyspark.pythonVersion=3 \
--conf spark.kubernetes.container.image=us.ic.io/test/spark-test:v5 \
--conf spark.kubernetes.container.image.pullSecrets=test-us-icr-io \
--conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.pvc-567yun-4b67-389u-9cfg1-gtabd234567.options.claimName=pvc-code \
--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.pvc-567yun-4b67-389u-9cfg1-gtabd234567.mount.path=/code-volume \
/code-volume/test/test_streaming.py
错误:
发生错误的原因是:调用 o60.load 时发生错误。 : java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArraySerializer 位于 org.apache.spark.sql.kafka010.KafkaSourceProvider$。(KafkaSourceProvider.scala:599) 在 org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala) 在 org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateStreamOptions(KafkaSourceProvider.scala:338) 在 org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:71) 在 org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:236) 在 org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118) 在 org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118) 在 org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:34) 在 org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:167) 在 org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:143) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法) 处 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 处 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 在 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 在 py4j.Gateway.invoke(Gateway.java:282) 在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 在 py4j.commands.CallCommand.execute(CallCommand.java:79) 处 py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) 在 py4j.ClientServerConnection.run(ClientServerConnection.java:106) 在 java.lang.Thread.run(Thread.java:750) 引起的: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer 位于 java.net.URLClassLoader.findClass(URLClassLoader.java:387) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 22 更多
我尝试在 Spark 提交中添加以下内容。但没有一个有效
试验 1)
--jars "/opt/spark/jars/spark-sql-kafka-0-10_2.12-3.2.1.jar,/opt/spark/jars/kafka-clients-2.4.0.jar,/opt/spark/jars/spark-streaming-kafka-0-10_2.12-3.2.1.jar"
试验 2)
--conf "spark.driver.extraClassPath=/opt/spark/jars/spark-sql-kafka-0-10_2.12-3.2.1.jar:/opt/spark/jars/kafka-clients-2.4.0.jar:/opt/spark/jars/spark-streaming-kafka-0-10_2.12-3.2.1.jar" \
--conf "spark.executor.extraClassPath=/opt/spark/jars/spark-sql-kafka-0-10_2.12-3.2.1.jar:/opt/spark/jars/kafka-clients-2.4.0.jar:/opt/spark/jars/spark-streaming-kafka-0-10_2.12-3.2.1.jar" \
试验 3)
--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.1
I'm submitting a PySpark Streaming Job in a kubernetes
environment. The job consumes data from kafka and process it using pyspark.
Spark version : 3.2.1,
Apache Kafka version : 2.4
I submit using the below spark-submit
command:
/opt/spark/bin/spark-submit \
--master k8s://https://test.containers.cloud.ibm.com:35000 \
--deploy-mode cluster \
--name spark-streaming-su \
--conf spark.kubernetes.driver.pod.name=spark-streaming-driver-su \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.kubernetes.namespace=test-spark \
--conf spark.kubernetes.file.upload.path=/code-volume/upload_path \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.authenticate.submission.caCertFile=/etc/spark-secret/ca.crt \
--conf spark.kubernetes.authenticate.submission.oauthTokenFile=/etc/spark-secret/token \
--conf spark.kubernetes.driver.limit.cores=2 \
--conf spark.driver.memory=2g \
--conf spark.sql.shuffle.partitions=4 \
--conf spark.executor.instances=2 \
--conf spark.executor.memory=2g \
--conf spark.kubernetes.executor.limit.cores=1 \
--conf spark.kubernetes.pyspark.pythonVersion=3 \
--conf spark.kubernetes.container.image=us.ic.io/test/spark-test:v5 \
--conf spark.kubernetes.container.image.pullSecrets=test-us-icr-io \
--conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.pvc-567yun-4b67-389u-9cfg1-gtabd234567.options.claimName=pvc-code \
--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.pvc-567yun-4b67-389u-9cfg1-gtabd234567.mount.path=/code-volume \
/code-volume/test/test_streaming.py
error:
Error occured due to : An error occurred while calling o60.load. :
java.lang.NoClassDefFoundError:
org/apache/kafka/common/serialization/ByteArraySerializer at
org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala:599)
at
org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala)
at
org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateStreamOptions(KafkaSourceProvider.scala:338)
at
org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:71)
at
org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:236)
at
org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118)
at
org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118)
at
org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:34)
at
org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:167)
at
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:143)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at
py4j.Gateway.invoke(Gateway.java:282) at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79) at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750) Caused by:
java.lang.ClassNotFoundException:
org.apache.kafka.common.serialization.ByteArraySerializer at
java.net.URLClassLoader.findClass(URLClassLoader.java:387) at
java.lang.ClassLoader.loadClass(ClassLoader.java:418) at
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at
java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 22 more
I tried with the following addition in the spark submit. But none worked
Trial 1)
--jars "/opt/spark/jars/spark-sql-kafka-0-10_2.12-3.2.1.jar,/opt/spark/jars/kafka-clients-2.4.0.jar,/opt/spark/jars/spark-streaming-kafka-0-10_2.12-3.2.1.jar"
Trial 2)
--conf "spark.driver.extraClassPath=/opt/spark/jars/spark-sql-kafka-0-10_2.12-3.2.1.jar:/opt/spark/jars/kafka-clients-2.4.0.jar:/opt/spark/jars/spark-streaming-kafka-0-10_2.12-3.2.1.jar" \
--conf "spark.executor.extraClassPath=/opt/spark/jars/spark-sql-kafka-0-10_2.12-3.2.1.jar:/opt/spark/jars/kafka-clients-2.4.0.jar:/opt/spark/jars/spark-streaming-kafka-0-10_2.12-3.2.1.jar" \
Trial 3)
--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.1
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我下载了
elastic-jars
、spark-sql-kafka-0-10_2.12-3.2.1.jar
及其依赖项,并使其在“/代码量/extrajars/
" 。注意:根据 OneCricketeer 的评论,以下内容应一起使用。
工作火花提交命令:
I downloaded
elastic-jars
,spark-sql-kafka-0-10_2.12-3.2.1.jar
and it's dependencies and made it available in "/code-volume/extrajars/
" .Note: as per OneCricketeer's comment , the below should be used together.
Working spark-submit command: