PySpark -Streaming- java.lang.ClassNotFoundException:org.apache.kafka.common.serialization.ByteArraySerializer

发布于 2025-01-10 05:25:14 字数 4831 浏览 1 评论 0原文

我正在 kubernetes 环境中提交 PySpark Streaming 作业。该作业使用来自 kafka 的数据并使用 pyspark 对其进行处理。

火花版本:3.2.1, Apache Kafka 版本:2.4

我使用以下 spark-submit 命令提交:

/opt/spark/bin/spark-submit \
     --master k8s://https://test.containers.cloud.ibm.com:35000 \
     --deploy-mode cluster \
     --name spark-streaming-su \
     --conf spark.kubernetes.driver.pod.name=spark-streaming-driver-su \
     --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
     --conf spark.kubernetes.namespace=test-spark \
     --conf spark.kubernetes.file.upload.path=/code-volume/upload_path \
     --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
     --conf spark.kubernetes.authenticate.submission.caCertFile=/etc/spark-secret/ca.crt \
     --conf spark.kubernetes.authenticate.submission.oauthTokenFile=/etc/spark-secret/token \
     --conf spark.kubernetes.driver.limit.cores=2 \
     --conf spark.driver.memory=2g \
     --conf spark.sql.shuffle.partitions=4 \
     --conf spark.executor.instances=2 \
     --conf spark.executor.memory=2g \
     --conf spark.kubernetes.executor.limit.cores=1 \
     --conf spark.kubernetes.pyspark.pythonVersion=3 \
     --conf spark.kubernetes.container.image=us.ic.io/test/spark-test:v5 \
     --conf spark.kubernetes.container.image.pullSecrets=test-us-icr-io \
     --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
     --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.pvc-567yun-4b67-389u-9cfg1-gtabd234567.options.claimName=pvc-code \
     --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.pvc-567yun-4b67-389u-9cfg1-gtabd234567.mount.path=/code-volume \
     /code-volume/test/test_streaming.py

错误:

发生错误的原因是:调用 o60.load 时发生错误。 : java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArraySerializer 位于 org.apache.spark.sql.kafka010.KafkaSourceProvider$。(KafkaSourceProvider.scala:599) 在 org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala) 在 org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateStreamOptions(KafkaSourceProvider.scala:338) 在 org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:71) 在 org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:236) 在 org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118) 在 org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118) 在 org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:34) 在 org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:167) 在 org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:143) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法) 处 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 处 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 在 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 在 py4j.Gateway.invoke(Gateway.java:282) 在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 在 py4j.commands.CallCommand.execute(CallCommand.java:79) 处 py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) 在 py4j.ClientServerConnection.run(ClientServerConnection.java:106) 在 java.lang.Thread.run(Thread.java:750) 引起的: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer 位于 java.net.URLClassLoader.findClass(URLClassLoader.java:387) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 22 更多

我尝试在 Spark 提交中添加以下内容。但没有一个有效

试验 1)

--jars "/opt/spark/jars/spark-sql-kafka-0-10_2.12-3.2.1.jar,/opt/spark/jars/kafka-clients-2.4.0.jar,/opt/spark/jars/spark-streaming-kafka-0-10_2.12-3.2.1.jar"

试验 2)

 --conf "spark.driver.extraClassPath=/opt/spark/jars/spark-sql-kafka-0-10_2.12-3.2.1.jar:/opt/spark/jars/kafka-clients-2.4.0.jar:/opt/spark/jars/spark-streaming-kafka-0-10_2.12-3.2.1.jar" \
 --conf "spark.executor.extraClassPath=/opt/spark/jars/spark-sql-kafka-0-10_2.12-3.2.1.jar:/opt/spark/jars/kafka-clients-2.4.0.jar:/opt/spark/jars/spark-streaming-kafka-0-10_2.12-3.2.1.jar" \

试验 3)

 --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.1

I'm submitting a PySpark Streaming Job in a kubernetes environment. The job consumes data from kafka and process it using pyspark.

Spark version : 3.2.1,
Apache Kafka version : 2.4

I submit using the below spark-submit command:

/opt/spark/bin/spark-submit \
     --master k8s://https://test.containers.cloud.ibm.com:35000 \
     --deploy-mode cluster \
     --name spark-streaming-su \
     --conf spark.kubernetes.driver.pod.name=spark-streaming-driver-su \
     --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
     --conf spark.kubernetes.namespace=test-spark \
     --conf spark.kubernetes.file.upload.path=/code-volume/upload_path \
     --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
     --conf spark.kubernetes.authenticate.submission.caCertFile=/etc/spark-secret/ca.crt \
     --conf spark.kubernetes.authenticate.submission.oauthTokenFile=/etc/spark-secret/token \
     --conf spark.kubernetes.driver.limit.cores=2 \
     --conf spark.driver.memory=2g \
     --conf spark.sql.shuffle.partitions=4 \
     --conf spark.executor.instances=2 \
     --conf spark.executor.memory=2g \
     --conf spark.kubernetes.executor.limit.cores=1 \
     --conf spark.kubernetes.pyspark.pythonVersion=3 \
     --conf spark.kubernetes.container.image=us.ic.io/test/spark-test:v5 \
     --conf spark.kubernetes.container.image.pullSecrets=test-us-icr-io \
     --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
     --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.pvc-567yun-4b67-389u-9cfg1-gtabd234567.options.claimName=pvc-code \
     --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.pvc-567yun-4b67-389u-9cfg1-gtabd234567.mount.path=/code-volume \
     /code-volume/test/test_streaming.py

error:

Error occured due to : An error occurred while calling o60.load. :
java.lang.NoClassDefFoundError:
org/apache/kafka/common/serialization/ByteArraySerializer at
org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala:599)
at
org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala)
at
org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateStreamOptions(KafkaSourceProvider.scala:338)
at
org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:71)
at
org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:236)
at
org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118)
at
org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118)
at
org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:34)
at
org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:167)
at
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:143)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at
py4j.Gateway.invoke(Gateway.java:282) at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79) at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750) Caused by:
java.lang.ClassNotFoundException:
org.apache.kafka.common.serialization.ByteArraySerializer at
java.net.URLClassLoader.findClass(URLClassLoader.java:387) at
java.lang.ClassLoader.loadClass(ClassLoader.java:418) at
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at
java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 22 more

I tried with the following addition in the spark submit. But none worked

Trial 1)

--jars "/opt/spark/jars/spark-sql-kafka-0-10_2.12-3.2.1.jar,/opt/spark/jars/kafka-clients-2.4.0.jar,/opt/spark/jars/spark-streaming-kafka-0-10_2.12-3.2.1.jar"

Trial 2)

 --conf "spark.driver.extraClassPath=/opt/spark/jars/spark-sql-kafka-0-10_2.12-3.2.1.jar:/opt/spark/jars/kafka-clients-2.4.0.jar:/opt/spark/jars/spark-streaming-kafka-0-10_2.12-3.2.1.jar" \
 --conf "spark.executor.extraClassPath=/opt/spark/jars/spark-sql-kafka-0-10_2.12-3.2.1.jar:/opt/spark/jars/kafka-clients-2.4.0.jar:/opt/spark/jars/spark-streaming-kafka-0-10_2.12-3.2.1.jar" \

Trial 3)

 --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.1

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

几味少女 2025-01-17 05:25:14

我下载了 elastic-jarsspark-sql-kafka-0-10_2.12-3.2.1.jar 及其依赖项,并使其在“/代码量/extrajars/" 。

注意:根据 OneCricketeer 的评论,以下内容应一起使用。

 --jars "/code-volume/extrajars/*" \
 --conf spark.driver.extraClassPath=/code-volume/extrajars/* \
 --conf spark.executor.extraClassPath=/code-volume/extrajars/* \

工作火花提交命令:

/opt/spark/bin/spark-submit \
     --master k8s://https://test.containers.cloud.ibm.com:35000 \
     --deploy-mode cluster \
     --name spark-streaming-su \
     --conf spark.kubernetes.driver.pod.name=spark-streaming-driver-su \
     --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
     --conf spark.kubernetes.namespace=test-spark \
     --conf spark.kubernetes.file.upload.path=/code-volume/upload_path \
     --jars "/code-volume/extrajars/*" \
     --conf spark.driver.extraClassPath=/code-volume/extrajars/* \
     --conf spark.executor.extraClassPath=/code-volume/extrajars/* \
     --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
     --conf spark.kubernetes.authenticate.submission.caCertFile=/etc/spark-secret/ca.crt \
     --conf spark.kubernetes.authenticate.submission.oauthTokenFile=/etc/spark-secret/token \
     --conf spark.kubernetes.driver.limit.cores=2 \
     --conf spark.driver.memory=2g \
     --conf spark.sql.shuffle.partitions=4 \
     --conf spark.executor.instances=2 \
     --conf spark.executor.memory=2g \
     --conf spark.kubernetes.executor.limit.cores=1 \
     --conf spark.kubernetes.pyspark.pythonVersion=3 \
     --conf spark.kubernetes.container.image=us.ic.io/test/spark-test:v5 \
     --conf spark.kubernetes.container.image.pullSecrets=test-us-icr-io \
     --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
     --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.pvc-567yun-4b67-389u-9cfg1-gtabd234567.options.claimName=pvc-code \
     --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.pvc-567yun-4b67-389u-9cfg1-gtabd234567.mount.path=/code-volume \
     --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.pvc-567yun-4b67-389u-9cfg1-gtabd234567.options.claimName=pvc-code \
     --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.pvc-567yun-4b67-389u-9cfg1-gtabd234567.mount.path=/code-volume \
     /code-volume/test/test_streaming.py

I downloaded elastic-jars, spark-sql-kafka-0-10_2.12-3.2.1.jar and it's dependencies and made it available in "/code-volume/extrajars/" .

Note: as per OneCricketeer's comment , the below should be used together.

 --jars "/code-volume/extrajars/*" \
 --conf spark.driver.extraClassPath=/code-volume/extrajars/* \
 --conf spark.executor.extraClassPath=/code-volume/extrajars/* \

Working spark-submit command:

/opt/spark/bin/spark-submit \
     --master k8s://https://test.containers.cloud.ibm.com:35000 \
     --deploy-mode cluster \
     --name spark-streaming-su \
     --conf spark.kubernetes.driver.pod.name=spark-streaming-driver-su \
     --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
     --conf spark.kubernetes.namespace=test-spark \
     --conf spark.kubernetes.file.upload.path=/code-volume/upload_path \
     --jars "/code-volume/extrajars/*" \
     --conf spark.driver.extraClassPath=/code-volume/extrajars/* \
     --conf spark.executor.extraClassPath=/code-volume/extrajars/* \
     --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
     --conf spark.kubernetes.authenticate.submission.caCertFile=/etc/spark-secret/ca.crt \
     --conf spark.kubernetes.authenticate.submission.oauthTokenFile=/etc/spark-secret/token \
     --conf spark.kubernetes.driver.limit.cores=2 \
     --conf spark.driver.memory=2g \
     --conf spark.sql.shuffle.partitions=4 \
     --conf spark.executor.instances=2 \
     --conf spark.executor.memory=2g \
     --conf spark.kubernetes.executor.limit.cores=1 \
     --conf spark.kubernetes.pyspark.pythonVersion=3 \
     --conf spark.kubernetes.container.image=us.ic.io/test/spark-test:v5 \
     --conf spark.kubernetes.container.image.pullSecrets=test-us-icr-io \
     --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
     --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.pvc-567yun-4b67-389u-9cfg1-gtabd234567.options.claimName=pvc-code \
     --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.pvc-567yun-4b67-389u-9cfg1-gtabd234567.mount.path=/code-volume \
     --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.pvc-567yun-4b67-389u-9cfg1-gtabd234567.options.claimName=pvc-code \
     --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.pvc-567yun-4b67-389u-9cfg1-gtabd234567.mount.path=/code-volume \
     /code-volume/test/test_streaming.py
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文