如何在Pyspark中正确使用foreachBatch()方法?

发布于 2025-02-01 13:00:46 字数 2495 浏览 4 评论 0原文

我正在尝试下沉由Spark中的结构化流动API处理到PostgreSQL的结果。我尝试了以下方法(以某种方式简化了,但希望它清楚):

class Processor:

    def __init__(self, args):
        self.spark_session = SparkSession \
                             .builder \
                             .appName("processor") \
                             .config("spark.sql.shuffle.partitions", 4) \
                             .config("spark.sql.streaming.checkpointLocation", "/tmp/spark-checkpoint") \
                             .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1") \
                             .config("spark.driver.extraClassPath", os.path.join(os.path.split(os.path.abspath(__file__))[0], "postgresql-42.3.6.jar")) \
                             .config("spark.jars", os.path.join(os.path.split(os.path.abspath(__file__))[0], "postgresql-42.3.6.jar")) \
                             .getOrCreate()
        self.db_url = args.db_url
        self.db_user = args.db_user
        self.db_password = args.db_password

    def get_data_to_publish(self):
        return self.spark_session.readStream # processing of data goes here ...

    def write_output_data(self, data):
        return data \
                .writeStream \
                .outputMode("append") \
                .foreachBatch(self.update_table) \
                .start()

    def update_table(self, data, batch_id):
        data \
        .write \
        .jdbc(url=self.db_url, table="output_table", mode="overwrite", properties={"user": self.db_user, "password": self.db_password})
               
    def process(self):
        result = self.get_data_to_publish()

        q = self.write_output_data(result)
        q.awaitTermination()

def __name__ == "__main__":
    args = # ... parsing args ...
    processor = Processor(args)
    processor.process()

我以独立模式运行,以 Docker容器)。不幸的是,我会遇到以下错误(我遗漏了整个Java堆栈跟踪,这很长):

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 3) (192.168.0.2 executor 0): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.execution.ExpandExec.projection of type scala.Function1 in instance of org.apache.spark.sql.execution.ExpandExec

我在做什么错?我可以做些什么来解决此错误?

I am trying to sink results processed by Structured Streaming API in Spark to PostgreSQL. I tried the following approach (somehow simplified, but hope it's clear):

class Processor:

    def __init__(self, args):
        self.spark_session = SparkSession \
                             .builder \
                             .appName("processor") \
                             .config("spark.sql.shuffle.partitions", 4) \
                             .config("spark.sql.streaming.checkpointLocation", "/tmp/spark-checkpoint") \
                             .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1") \
                             .config("spark.driver.extraClassPath", os.path.join(os.path.split(os.path.abspath(__file__))[0], "postgresql-42.3.6.jar")) \
                             .config("spark.jars", os.path.join(os.path.split(os.path.abspath(__file__))[0], "postgresql-42.3.6.jar")) \
                             .getOrCreate()
        self.db_url = args.db_url
        self.db_user = args.db_user
        self.db_password = args.db_password

    def get_data_to_publish(self):
        return self.spark_session.readStream # processing of data goes here ...

    def write_output_data(self, data):
        return data \
                .writeStream \
                .outputMode("append") \
                .foreachBatch(self.update_table) \
                .start()

    def update_table(self, data, batch_id):
        data \
        .write \
        .jdbc(url=self.db_url, table="output_table", mode="overwrite", properties={"user": self.db_user, "password": self.db_password})
               
    def process(self):
        result = self.get_data_to_publish()

        q = self.write_output_data(result)
        q.awaitTermination()

def __name__ == "__main__":
    args = # ... parsing args ...
    processor = Processor(args)
    processor.process()

I run this in the standalone mode with Spark engine deployed as Docker container). Unfortunatelly I get the following error (I left out the whole Java stack trace, it is quite long):

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 3) (192.168.0.2 executor 0): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.execution.ExpandExec.projection of type scala.Function1 in instance of org.apache.spark.sql.execution.ExpandExec

What am I doing wrong? Is there anything I could do to fix this error?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文