Spark结构化流式处理pyspark,applyInPandas被立即调用并且不等待窗口过期

发布于 2025-01-16 18:34:54 字数 2892 浏览 3 评论 0原文

Spark 结构化流不允许窗口函数执行滞后、超前操作。所以我尝试使用 applyInpandas 函数。

我有一个 5 分钟的翻滚窗口,水印在附加模式下设置为 1 分钟。我需要等到窗口到期并在其上应用我的 UDF 函数。所以我使用 applyInpandas。

问题是我的自定义 UDF 函数在数据到达后立即被调用,而不是等到窗口到期。

我的代码

schema = StructType([
    StructField("date", TimestampType(), True),
    StructField("id", StringType(), True),
    StructField("x1", IntegerType(), True),
    StructField("y1", IntegerType(), True),
])
newschema = StructType([
    StructField("date", TimestampType(), True),
    StructField("id", StringType(), True),
    StructField("x1", DoubleType(), True),
    StructField("y1", DoubleType(), True),
    StructField("x2", DoubleType(), True),
    StructField("y2", DoubleType(), True),
    StructField("dis", DoubleType(), True)
])

spark = SparkSession.builder.appName("testing").getOrCreate()
spark.sparkContext.setLogLevel('WARN')

truck_data = spark.readStream.format("kafka")\
        .option("kafka.bootstrap.servers", KAFKA_BROKER)\
        .option("subscribe", KAFKA_INPUT_TOPIC)\
        .option("startingOffsets", "latest") \
        .load()

truck_data = truck_data.withColumn("value", col("value").cast(StringType()))\
        .withColumn("jsonData", from_json(col("value"), schema)) \
        .select("JsonData.*")\

def f(pdf):
    print("inside applyinpandas")
    pdf = pdf.sort_values(by='date')
    pdf = pdf.assign(x2=pdf.x1.shift(1))
    pdf = pdf.assign(y2=pdf.y1.shift(1))
    pdf = pdf.assign(dis=np.sqrt((pdf.x1 - pdf.x2)**2 + (pdf.x1 - pdf.x2)**2))
    pdf = pdf.fillna(0)
    return pdf

truck_data = truck_data.withWatermark("date", "1 minute").\
    .groupBy(window("date", "5 minute"), "id")\
    .applyInPandas(f, schema=newschema)

query = truck_data\
#     .writeStream\
#     .format("console") \
#     .option("numRows", 300)\
#     .option("truncate", False)\
#     .start().awaitTermination()

示例输入数据

{"date":"2022-03-23 09:04:32.242637","id":"B","x1":3,"y1":3}

{“日期”:“2022-03-23 09:04:32.242737","id":"A","x1":2,"y1":2}

{"日期":"2022-03-23 09:04:29.242737","id":" A","x1":1,"y1":1}

{"日期":"2022-03-23 09:04:55.242737","id":"A","x1":6,"y1":6}

{"日期":"2022-03-23 09:04:40.242737","id":" B","x1":7,"y1":7}

{"日期":"2022-03-23 09:04:29.242737","id":"B","x1":1,"y1":1}

{"日期":"2022-03-23 09:04:44.242737","id":" A","x1":5,"y1":5}

{"日期":"2022-03-23 09:04:35.242737","id":"B","x1":5,"y1":5}

{"日期":"2022-03-23 09:04:35.242737","id":" A","x1":3,"y1":3}

{"日期":"2022-03-23 09:04:40.242737","id":"A","x1":4,"y1":4}

{"日期":"2022-03-23 09:04:44.242737","id":" B","x1":9,"y1":9}

{"日期":"2022-03-23 09:04:55.242737","id":"B","x1":11,"y1":11}

{"日期":"2022-03-23 09:06:55.242737","id":" B","x1":11,"y1":11}

需要输出 从 pycharm 输出图像

spark structured streaming is not allowing Window function to perform lag,lead operations. So I am trying to use applyInpandas function.

I have a tumbling window of 5 minutes with watermark set to 1 minute in append mode.I need to wait till window expires and apply my UDF function on it.So I am using applyInpandas.

The problem is my custom UDF function is being called immediately after data arrives and not waiting till the window expires.

My code

schema = StructType([
    StructField("date", TimestampType(), True),
    StructField("id", StringType(), True),
    StructField("x1", IntegerType(), True),
    StructField("y1", IntegerType(), True),
])
newschema = StructType([
    StructField("date", TimestampType(), True),
    StructField("id", StringType(), True),
    StructField("x1", DoubleType(), True),
    StructField("y1", DoubleType(), True),
    StructField("x2", DoubleType(), True),
    StructField("y2", DoubleType(), True),
    StructField("dis", DoubleType(), True)
])

spark = SparkSession.builder.appName("testing").getOrCreate()
spark.sparkContext.setLogLevel('WARN')

truck_data = spark.readStream.format("kafka")\
        .option("kafka.bootstrap.servers", KAFKA_BROKER)\
        .option("subscribe", KAFKA_INPUT_TOPIC)\
        .option("startingOffsets", "latest") \
        .load()

truck_data = truck_data.withColumn("value", col("value").cast(StringType()))\
        .withColumn("jsonData", from_json(col("value"), schema)) \
        .select("JsonData.*")\

def f(pdf):
    print("inside applyinpandas")
    pdf = pdf.sort_values(by='date')
    pdf = pdf.assign(x2=pdf.x1.shift(1))
    pdf = pdf.assign(y2=pdf.y1.shift(1))
    pdf = pdf.assign(dis=np.sqrt((pdf.x1 - pdf.x2)**2 + (pdf.x1 - pdf.x2)**2))
    pdf = pdf.fillna(0)
    return pdf

truck_data = truck_data.withWatermark("date", "1 minute").\
    .groupBy(window("date", "5 minute"), "id")\
    .applyInPandas(f, schema=newschema)

query = truck_data\
#     .writeStream\
#     .format("console") \
#     .option("numRows", 300)\
#     .option("truncate", False)\
#     .start().awaitTermination()

Sample input data

{"date":"2022-03-23 09:04:32.242637","id":"B","x1":3,"y1":3}

{"date":"2022-03-23 09:04:32.242737","id":"A","x1":2,"y1":2}

{"date":"2022-03-23 09:04:29.242737","id":"A","x1":1,"y1":1}

{"date":"2022-03-23 09:04:55.242737","id":"A","x1":6,"y1":6}

{"date":"2022-03-23 09:04:40.242737","id":"B","x1":7,"y1":7}

{"date":"2022-03-23 09:04:29.242737","id":"B","x1":1,"y1":1}

{"date":"2022-03-23 09:04:44.242737","id":"A","x1":5,"y1":5}

{"date":"2022-03-23 09:04:35.242737","id":"B","x1":5,"y1":5}

{"date":"2022-03-23 09:04:35.242737","id":"A","x1":3,"y1":3}

{"date":"2022-03-23 09:04:40.242737","id":"A","x1":4,"y1":4}

{"date":"2022-03-23 09:04:44.242737","id":"B","x1":9,"y1":9}

{"date":"2022-03-23 09:04:55.242737","id":"B","x1":11,"y1":11}

{"date":"2022-03-23 09:06:55.242737","id":"B","x1":11,"y1":11}

Output required
output image from pycharm

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文