Pyspark Kafka ReadStream
我正在使用以下代码读取KAFKA主题的数据。
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 pyspark-shell'
df = spark.readStream.format("kafka").option("kafka.bootstrap.server", kafkaServer).option("subscribe", topic_name_read).option("includeHeaders", "true").option("startingOffsets", "earliest").load()
但这引发了以下错误。
文件“ c:\ spark \ spark \ spark-3.1.3 bin-hadoop2.7 \ python \ python \ pyspark \ sql \ sql \ sql \ readwriter.py”,第210行,in LoadRetReturn self._df(self._jreader.load())文件“ c:c:\ spark \ spark-3.1.3 bin-hadoop2.7 \ python \ python \ lib \ py4j-0.10.9-src.zip \ zip \ py4j \ java_gateway.py”,第1304行\ spark-3.1.3 bin-hadoop2.7 \ python \ python \ pyspark \ sql \ sql \ utils.py”,第111行,在decoreturn f(*a,** kw)文件中-bin-hadoop2.7 \ python \ lib \ py4j-0.10.9-src.zip \ py4j \ progent.py.py”,第326行,在get_return_valuepy4j.protot.prot.py4jjavaerror中lang.noclassdeffounderror:org/apache/kafka/common/serialization/bytearrayserializerait org.apache.spark.sql.kafka010.kafkasourceprovider $。 kafkasourceprovider $。在org.apache .spark.sql.kafka010.kafkasourceprovider.createeralation(kafkasourceprovider.scala:127)at org.apache.spark.sql.sql.squt.sql.execution.datasources.datasources.datasource.datasource.datasource.resolce.resolverelation(datasOlce.scalce.scala:355) 。 spark.sql.dataframereader.load(dataframereader.scala:307)at org.apache.spark.sql.sql.dataframereader.load(dataframereader.scala:225)在sun.reflect.reflect.nativemethodaccesorimpl.invoke.invoke0(sun.nativemethodemethodemethodaccesorimple.invoke0(sun) nativemethodaccesorimpl.invoke(nativemethodaccesorimpl.java:62)在sun.reflect.delegatingmethodaccesorimpl.invoke(java.lang.lang.reflect.reflect.method.method.in.method.method.method.methode.mjrece.mjava:43) ETHODINVOKER.INVOKE (MethodInvoker.java:244)at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)at py4j.Gateway.invoke(Gateway.java:282)at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)请访问py4j.commands.callcommand.execute(callcommand.java:79),py4j.gatewaywayconnection.run(gatewayconnection.java:238),java.lang.thread.thread.run(thread.java:750),引起:java.lang。 classNotFoundException:org.apache.kafka.common.serialization.bytearrayserializerAsat Java.net.urlclassloader.findclass(urlclassloader.java:387) 。
功能。
df1 = df.withColumn("items", F.explode(F.col("items")))
attributeError:'function'对象没有属性'带有column'
谢谢
环境:
python -3.9.9
pyspark -3.1.3
kafka -python -2.0.2
spark -hadoop -3.1.3和2.7
< strong> jar使用
spark-streaming-kafka-0-10_2.12-3.1.3,
spark-sql-kafka-0-10_2.12-3.1.3
I'm reading data from kafka topic using the below code.
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 pyspark-shell'
df = spark.readStream.format("kafka").option("kafka.bootstrap.server", kafkaServer).option("subscribe", topic_name_read).option("includeHeaders", "true").option("startingOffsets", "earliest").load()
But it throws the below error.
File "C:\spark\spark-3.1.3-bin-hadoop2.7\python\pyspark\sql\readwriter.py", line 210, in loadreturn self._df(self._jreader.load())File "C:\spark\spark-3.1.3-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1304, in callFile "C:\spark\spark-3.1.3-bin-hadoop2.7\python\pyspark\sql\utils.py", line 111, in decoreturn f(*a, **kw)File "C:\spark\spark-3.1.3-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\protocol.py", line 326, in get_return_valuepy4j.protocol.Py4JJavaError: An error occurred while calling o37.load.: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArraySerializerat org.apache.spark.sql.kafka010.KafkaSourceProvider$.<init>(KafkaSourceProvider.scala:556)at org.apache.spark.sql.kafka010.KafkaSourceProvider$.<clinit>(KafkaSourceProvider.scala)at org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateBatchOptions(KafkaSourceProvider.scala:336)at org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:127)at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:355)at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307)at scala.Option.getOrElse(Option.scala:189)at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:225)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)at py4j.Gateway.invoke(Gateway.java:282)at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)at py4j.commands.CallCommand.execute(CallCommand.java:79)at py4j.GatewayConnection.run(GatewayConnection.java:238)at java.lang.Thread.run(Thread.java:750)Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializerat java.net.URLClassLoader.findClass(URLClassLoader.java:387)at java.lang.ClassLoader.loadClass(ClassLoader.java:418)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)at java.lang.ClassLoader.loadClass(ClassLoader.java:351)... 21 more
If I use only .load instead on load(), It unable to identify withColumn function.
df1 = df.withColumn("items", F.explode(F.col("items")))
AttributeError: 'function' object has no attribute 'withColumn'
Thank you
Environment:
Python - 3.9.9
pySpark - 3.1.3
kafka-python - 2.0.2
spark-hadoop - 3.1.3 and 2.7
Jar Used
spark-streaming-kafka-0-10_2.12-3.1.3,
spark-sql-kafka-0-10_2.12-3.1.3
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
.load()
是正确的。您的真正错误:
您需要在您的
中包括
kafka-clients
依赖项 - packages您仅在此处使用
Spark-SQL-KAFKA
,因此您应该删除另一个。.load()
is correct.Your real error:
You need to include
kafka-clients
dependency in your--packages
argAlso, you are only using
spark-sql-kafka
here, so you should remove the other one.