如何使用本地 JAR 文件运行 Spark 结构化流
我正在 EKS 上使用 EMR 的 Docker 映像之一 (emr-6.5.0:20211119),并研究如何使用 Spark 结构化编程 (pyspark) 在 Kafka 上工作。根据 集成指南,我运行Python 脚本如下。
$SPARK_HOME/bin/spark-submit \
--deploy-mode client \
--master local \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 \
<myscript>.py
它从 Maven 中心下载包,我看到一些 JAR 文件被下载到 ~/.ivy2/jars
中。
com.github.luben_zstd-jni-1.4.8-1.jar org.apache.spark_spark-sql-kafka-0-10_2.12-3.1.2.jar org.slf4j_slf4j-api-1.7.30.jar
org.apache.commons_commons-pool2-2.6.2.jar org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.1.2.jar org.spark-project.spark_unused-1.0.0.jar
org.apache.kafka_kafka-clients-2.6.0.jar org.lz4_lz4-java-1.7.1.jar org.xerial.snappy_snappy-java-1.1.8.2.jar
但是我发现主 JAR 文件已经下载到 $SPARK_HOME/external/lib
中,我想知道如何使用它而不是下载它。
spark-avro_2.12-3.1.2-amzn-1.jar spark-ganglia-lgpl.jar spark-streaming-kafka-0-10-assembly_2.12-3.1.2-amzn-1.jar spark-streaming-kinesis-asl-assembly.jar
spark-avro.jar **spark-sql-kafka-0-10_2.12-3.1.2-amzn-1.jar spark-streaming-kafka-0-10-assembly.jar spark-token-provider-kafka-0-10_2.12-3.1.2-amzn-1.jar
spark-ganglia-lgpl_2.12-3.1.2-amzn-1.jar **spark-sql-kafka-0-10.jar spark-streaming-kinesis-asl-assembly_2.12-3.1.2-amzn-1.jar spark-token-provider-kafka-0-10.jar
更新 2022-03-09
我在更新 spark-defaults.conf
后尝试如下所示 - 添加了外部 lib 文件夹。
spark.driver.extraClassPath /usr/lib/spark/external/lib/*:...
spark.driver.extraLibraryPath ...
spark.executor.extraClassPath /usr/lib/spark/external/lib/*:...
spark.executor.extraLibraryPath ...
我可以在没有 --packages
的情况下运行,但失败并出现以下错误。
22/03/09 05:37:25 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NoClassDefFoundError: org/apache/commons/pool2/impl/GenericKeyedObjectPoolConfig
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$.<init>(KafkaDataConsumer.scala:623)
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$.<clinit>(KafkaDataConsumer.scala)
at org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.<init>(KafkaBatchPartitionReader.scala:52)
at org.apache.spark.sql.kafka010.KafkaBatchReaderFactory$.createReader(KafkaBatchPartitionReader.scala:40)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 33 more
尽管我尝试添加 --packages org.apache.commons:commons-pool2:2.6.2
,但这并没有帮助。
I'm using one of the Docker images of EMR on EKS (emr-6.5.0:20211119) and investigating how to work on Kafka with Spark Structured Programming (pyspark). As per the integration guide, I run a Python script as following.
$SPARK_HOME/bin/spark-submit \
--deploy-mode client \
--master local \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 \
<myscript>.py
It download the package from Maven central and I see some JAR files are downloaded into ~/.ivy2/jars
.
com.github.luben_zstd-jni-1.4.8-1.jar org.apache.spark_spark-sql-kafka-0-10_2.12-3.1.2.jar org.slf4j_slf4j-api-1.7.30.jar
org.apache.commons_commons-pool2-2.6.2.jar org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.1.2.jar org.spark-project.spark_unused-1.0.0.jar
org.apache.kafka_kafka-clients-2.6.0.jar org.lz4_lz4-java-1.7.1.jar org.xerial.snappy_snappy-java-1.1.8.2.jar
However I find the main JAR file is already download into $SPARK_HOME/external/lib
and I wonder how to make use of it instead of downloading it.
spark-avro_2.12-3.1.2-amzn-1.jar spark-ganglia-lgpl.jar spark-streaming-kafka-0-10-assembly_2.12-3.1.2-amzn-1.jar spark-streaming-kinesis-asl-assembly.jar
spark-avro.jar **spark-sql-kafka-0-10_2.12-3.1.2-amzn-1.jar spark-streaming-kafka-0-10-assembly.jar spark-token-provider-kafka-0-10_2.12-3.1.2-amzn-1.jar
spark-ganglia-lgpl_2.12-3.1.2-amzn-1.jar **spark-sql-kafka-0-10.jar spark-streaming-kinesis-asl-assembly_2.12-3.1.2-amzn-1.jar spark-token-provider-kafka-0-10.jar
UPDATE 2022-03-09
I tried after updating spark-defaults.conf
as shown below - added the external lib folder.
spark.driver.extraClassPath /usr/lib/spark/external/lib/*:...
spark.driver.extraLibraryPath ...
spark.executor.extraClassPath /usr/lib/spark/external/lib/*:...
spark.executor.extraLibraryPath ...
I can run without --packages
but it fails with the following error.
22/03/09 05:37:25 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NoClassDefFoundError: org/apache/commons/pool2/impl/GenericKeyedObjectPoolConfig
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$.<init>(KafkaDataConsumer.scala:623)
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$.<clinit>(KafkaDataConsumer.scala)
at org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.<init>(KafkaBatchPartitionReader.scala:52)
at org.apache.spark.sql.kafka010.KafkaBatchReaderFactory$.createReader(KafkaBatchPartitionReader.scala:40)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 33 more
It doesn't help although I tried with adding --packages org.apache.commons:commons-pool2:2.6.2
.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
您可以使用
--jars
来代替--packages
来引用本地文件系统You would use
--jars
to refer to local filesystem in-place of--packages
不幸的是,由于错误,我无法仅使用
$SPARK_HOME/external/lib
中的 JAR 文件提交应用程序。错误的详细信息已更新为问题。相反,我最终预先下载了包 JAR 文件并使用它们。我首先使用以下命令运行。这里
foo.py
是一个空文件,它将把包 JAR 文件下载到/home/hadoop/.ivy2/jars
中。然后我更新了
spark-defaults.conf
如下。之后,我运行了没有
--packages
的提交命令,并且没有出现错误。当下载包 JAR 文件需要很长时间时,此方法可能很有用,因为它们可以预先下载。注意 EKS 上的 EMR 支持使用 ECR 中的自定义映像。
Unfortunately I cannot submit an app only with the JAR files in
$SPARK_HOME/external/lib
due to an error. The details of the error are updated to the question. Instead I ended up pre-downloading the package JAR files and using those.I first ran with the following command. Here
foo.py
is an empty file and it'll download the package JAR files into/home/hadoop/.ivy2/jars
.Then I updated
spark-defaults.conf
as following.After that, I ran the submit command without
--packages
and it worked without an error.This approach is likely to be useful when it takes long to download package JAR files as they can be pre-downloaded. Note EMR on EKS supports using a custom image from ECR.