如何在Pyspark中正确使用foreachBatch()方法?
我正在尝试下沉由Spark中的结构化流动API处理到PostgreSQL的结果。我尝试了以下方法(以某种方式简化了,但希望它清楚):
class Processor:
def __init__(self, args):
self.spark_session = SparkSession \
.builder \
.appName("processor") \
.config("spark.sql.shuffle.partitions", 4) \
.config("spark.sql.streaming.checkpointLocation", "/tmp/spark-checkpoint") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1") \
.config("spark.driver.extraClassPath", os.path.join(os.path.split(os.path.abspath(__file__))[0], "postgresql-42.3.6.jar")) \
.config("spark.jars", os.path.join(os.path.split(os.path.abspath(__file__))[0], "postgresql-42.3.6.jar")) \
.getOrCreate()
self.db_url = args.db_url
self.db_user = args.db_user
self.db_password = args.db_password
def get_data_to_publish(self):
return self.spark_session.readStream # processing of data goes here ...
def write_output_data(self, data):
return data \
.writeStream \
.outputMode("append") \
.foreachBatch(self.update_table) \
.start()
def update_table(self, data, batch_id):
data \
.write \
.jdbc(url=self.db_url, table="output_table", mode="overwrite", properties={"user": self.db_user, "password": self.db_password})
def process(self):
result = self.get_data_to_publish()
q = self.write_output_data(result)
q.awaitTermination()
def __name__ == "__main__":
args = # ... parsing args ...
processor = Processor(args)
processor.process()
我以独立模式运行,以 Docker容器)。不幸的是,我会遇到以下错误(我遗漏了整个Java堆栈跟踪,这很长):
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 3) (192.168.0.2 executor 0): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.execution.ExpandExec.projection of type scala.Function1 in instance of org.apache.spark.sql.execution.ExpandExec
我在做什么错?我可以做些什么来解决此错误?
I am trying to sink results processed by Structured Streaming API in Spark to PostgreSQL. I tried the following approach (somehow simplified, but hope it's clear):
class Processor:
def __init__(self, args):
self.spark_session = SparkSession \
.builder \
.appName("processor") \
.config("spark.sql.shuffle.partitions", 4) \
.config("spark.sql.streaming.checkpointLocation", "/tmp/spark-checkpoint") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1") \
.config("spark.driver.extraClassPath", os.path.join(os.path.split(os.path.abspath(__file__))[0], "postgresql-42.3.6.jar")) \
.config("spark.jars", os.path.join(os.path.split(os.path.abspath(__file__))[0], "postgresql-42.3.6.jar")) \
.getOrCreate()
self.db_url = args.db_url
self.db_user = args.db_user
self.db_password = args.db_password
def get_data_to_publish(self):
return self.spark_session.readStream # processing of data goes here ...
def write_output_data(self, data):
return data \
.writeStream \
.outputMode("append") \
.foreachBatch(self.update_table) \
.start()
def update_table(self, data, batch_id):
data \
.write \
.jdbc(url=self.db_url, table="output_table", mode="overwrite", properties={"user": self.db_user, "password": self.db_password})
def process(self):
result = self.get_data_to_publish()
q = self.write_output_data(result)
q.awaitTermination()
def __name__ == "__main__":
args = # ... parsing args ...
processor = Processor(args)
processor.process()
I run this in the standalone mode with Spark engine deployed as Docker container). Unfortunatelly I get the following error (I left out the whole Java stack trace, it is quite long):
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 3) (192.168.0.2 executor 0): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.execution.ExpandExec.projection of type scala.Function1 in instance of org.apache.spark.sql.execution.ExpandExec
What am I doing wrong? Is there anything I could do to fix this error?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论