Pyspark Kafka ReadStream

发布于 2025-01-17 22:25:27 字数 2844 浏览 2 评论 0原文

我正在使用以下代码读取KAFKA主题的数据。

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 pyspark-shell'

df = spark.readStream.format("kafka").option("kafka.bootstrap.server", kafkaServer).option("subscribe", topic_name_read).option("includeHeaders", "true").option("startingOffsets", "earliest").load()

但这引发了以下错误。

文件“ c:\ spark \ spark \ spark-3.1.3 bin-hadoop2.7 \ python \ python \ pyspark \ sql \ sql \ sql \ readwriter.py”,第210行,in LoadRetReturn self._df(self._jreader.load())文件“ c:c:\ spark \ spark-3.1.3 bin-hadoop2.7 \ python \ python \ lib \ py4j-0.10.9-src.zip \ zip \ py4j \ java_gateway.py”,第1304行\ spark-3.1.3 bin-hadoop2.7 \ python \ python \ pyspark \ sql \ sql \ utils.py”,第111行,在decoreturn f(*a,** kw)文件中-bin-hadoop2.7 \ python \ lib \ py4j-0.10.9-src.zip \ py4j \ progent.py.py”,第326行,在get_return_valuepy4j.protot.prot.py4jjavaerror中lang.noclassdeffounderror:org/apache/kafka/common/serialization/bytearrayserializerait org.apache.spark.sql.kafka010.kafkasourceprovider $。 kafkasourceprovider $。在org.apache .spark.sql.kafka010.kafkasourceprovider.createeralation(kafkasourceprovider.scala:127)at org.apache.spark.sql.sql.squt.sql.execution.datasources.datasources.datasource.datasource.datasource.resolce.resolverelation(datasOlce.scalce.scala:355) 。 spark.sql.dataframereader.load(dataframereader.scala:307)at org.apache.spark.sql.sql.dataframereader.load(dataframereader.scala:225)在sun.reflect.reflect.nativemethodaccesorimpl.invoke.invoke0(sun.nativemethodemethodemethodaccesorimple.invoke0(sun) nativemethodaccesorimpl.invoke(nativemethodaccesorimpl.java:62)在sun.reflect.delegatingmethodaccesorimpl.invoke(java.lang.lang.reflect.reflect.method.method.in.method.method.method.methode.mjrece.mjava:43) ETHODINVOKER.INVOKE (MethodInvoker.java:244)at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)at py4j.Gateway.invoke(Gateway.java:282)at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)请访问py4j.commands.callcommand.execute(callcommand.java:79),py4j.gatewaywayconnection.run(gatewayconnection.java:238),java.lang.thread.thread.run(thread.java:750),引起:java.lang。 classNotFoundException:org.apache.kafka.common.serialization.bytearrayserializerAsat Java.net.urlclassloader.findclass(urlclassloader.java:387) 。

​功能。

df1 = df.withColumn("items", F.explode(F.col("items")))

attributeError:'function'对象没有属性'带有column'

谢谢

环境:

python -3.9.9

pyspark -3.1.3

kafka -python -2.0.2

spark -hadoop -3.1.3和2.7

< strong> jar使用

spark-streaming-kafka-0-10_2.12-3.1.3,

spark-sql-kafka-0-10_2.12-3.1.3

I'm reading data from kafka topic using the below code.

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 pyspark-shell'

df = spark.readStream.format("kafka").option("kafka.bootstrap.server", kafkaServer).option("subscribe", topic_name_read).option("includeHeaders", "true").option("startingOffsets", "earliest").load()

But it throws the below error.

File "C:\spark\spark-3.1.3-bin-hadoop2.7\python\pyspark\sql\readwriter.py", line 210, in loadreturn self._df(self._jreader.load())File "C:\spark\spark-3.1.3-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1304, in callFile "C:\spark\spark-3.1.3-bin-hadoop2.7\python\pyspark\sql\utils.py", line 111, in decoreturn f(*a, **kw)File "C:\spark\spark-3.1.3-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\protocol.py", line 326, in get_return_valuepy4j.protocol.Py4JJavaError: An error occurred while calling o37.load.: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArraySerializerat org.apache.spark.sql.kafka010.KafkaSourceProvider$.<init>(KafkaSourceProvider.scala:556)at org.apache.spark.sql.kafka010.KafkaSourceProvider$.<clinit>(KafkaSourceProvider.scala)at org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateBatchOptions(KafkaSourceProvider.scala:336)at org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:127)at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:355)at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307)at scala.Option.getOrElse(Option.scala:189)at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:225)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)at py4j.Gateway.invoke(Gateway.java:282)at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)at py4j.commands.CallCommand.execute(CallCommand.java:79)at py4j.GatewayConnection.run(GatewayConnection.java:238)at java.lang.Thread.run(Thread.java:750)Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializerat java.net.URLClassLoader.findClass(URLClassLoader.java:387)at java.lang.ClassLoader.loadClass(ClassLoader.java:418)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)at java.lang.ClassLoader.loadClass(ClassLoader.java:351)... 21 more

If I use only .load instead on load(), It unable to identify withColumn function.

df1 = df.withColumn("items", F.explode(F.col("items")))

AttributeError: 'function' object has no attribute 'withColumn'

Thank you

Environment:

Python - 3.9.9

pySpark - 3.1.3

kafka-python - 2.0.2

spark-hadoop - 3.1.3 and 2.7

Jar Used

spark-streaming-kafka-0-10_2.12-3.1.3,

spark-sql-kafka-0-10_2.12-3.1.3

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

2025-01-24 22:25:27

.load()是正确的。

您的真正错误:

classNotFoundException:org.apache.kafka.common.serialization.bytearrayserializer

您需要在您的中包括kafka-clients依赖项 - packages

您仅在此处使用Spark-SQL-KAFKA,因此您应该删除另一个。

.load() is correct.

Your real error:

ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer

You need to include kafka-clients dependency in your --packages arg

Also, you are only using spark-sql-kafka here, so you should remove the other one.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文