Spark结构化流式处理pyspark,applyInPandas被立即调用并且不等待窗口过期
Spark 结构化流不允许窗口函数执行滞后、超前操作。所以我尝试使用 applyInpandas 函数。
我有一个 5 分钟的翻滚窗口,水印在附加模式下设置为 1 分钟。我需要等到窗口到期并在其上应用我的 UDF 函数。所以我使用 applyInpandas。
问题是我的自定义 UDF 函数在数据到达后立即被调用,而不是等到窗口到期。
我的代码
schema = StructType([
StructField("date", TimestampType(), True),
StructField("id", StringType(), True),
StructField("x1", IntegerType(), True),
StructField("y1", IntegerType(), True),
])
newschema = StructType([
StructField("date", TimestampType(), True),
StructField("id", StringType(), True),
StructField("x1", DoubleType(), True),
StructField("y1", DoubleType(), True),
StructField("x2", DoubleType(), True),
StructField("y2", DoubleType(), True),
StructField("dis", DoubleType(), True)
])
spark = SparkSession.builder.appName("testing").getOrCreate()
spark.sparkContext.setLogLevel('WARN')
truck_data = spark.readStream.format("kafka")\
.option("kafka.bootstrap.servers", KAFKA_BROKER)\
.option("subscribe", KAFKA_INPUT_TOPIC)\
.option("startingOffsets", "latest") \
.load()
truck_data = truck_data.withColumn("value", col("value").cast(StringType()))\
.withColumn("jsonData", from_json(col("value"), schema)) \
.select("JsonData.*")\
def f(pdf):
print("inside applyinpandas")
pdf = pdf.sort_values(by='date')
pdf = pdf.assign(x2=pdf.x1.shift(1))
pdf = pdf.assign(y2=pdf.y1.shift(1))
pdf = pdf.assign(dis=np.sqrt((pdf.x1 - pdf.x2)**2 + (pdf.x1 - pdf.x2)**2))
pdf = pdf.fillna(0)
return pdf
truck_data = truck_data.withWatermark("date", "1 minute").\
.groupBy(window("date", "5 minute"), "id")\
.applyInPandas(f, schema=newschema)
query = truck_data\
# .writeStream\
# .format("console") \
# .option("numRows", 300)\
# .option("truncate", False)\
# .start().awaitTermination()
示例输入数据
{"date":"2022-03-23 09:04:32.242637","id":"B","x1":3,"y1":3}
{“日期”:“2022-03-23 09:04:32.242737","id":"A","x1":2,"y1":2}
{"日期":"2022-03-23 09:04:29.242737","id":" A","x1":1,"y1":1}
{"日期":"2022-03-23 09:04:55.242737","id":"A","x1":6,"y1":6}
{"日期":"2022-03-23 09:04:40.242737","id":" B","x1":7,"y1":7}
{"日期":"2022-03-23 09:04:29.242737","id":"B","x1":1,"y1":1}
{"日期":"2022-03-23 09:04:44.242737","id":" A","x1":5,"y1":5}
{"日期":"2022-03-23 09:04:35.242737","id":"B","x1":5,"y1":5}
{"日期":"2022-03-23 09:04:35.242737","id":" A","x1":3,"y1":3}
{"日期":"2022-03-23 09:04:40.242737","id":"A","x1":4,"y1":4}
{"日期":"2022-03-23 09:04:44.242737","id":" B","x1":9,"y1":9}
{"日期":"2022-03-23 09:04:55.242737","id":"B","x1":11,"y1":11}
{"日期":"2022-03-23 09:06:55.242737","id":" B","x1":11,"y1":11}
需要输出 从 pycharm 输出图像
spark structured streaming is not allowing Window function to perform lag,lead operations. So I am trying to use applyInpandas function.
I have a tumbling window of 5 minutes with watermark set to 1 minute in append mode.I need to wait till window expires and apply my UDF function on it.So I am using applyInpandas.
The problem is my custom UDF function is being called immediately after data arrives and not waiting till the window expires.
My code
schema = StructType([
StructField("date", TimestampType(), True),
StructField("id", StringType(), True),
StructField("x1", IntegerType(), True),
StructField("y1", IntegerType(), True),
])
newschema = StructType([
StructField("date", TimestampType(), True),
StructField("id", StringType(), True),
StructField("x1", DoubleType(), True),
StructField("y1", DoubleType(), True),
StructField("x2", DoubleType(), True),
StructField("y2", DoubleType(), True),
StructField("dis", DoubleType(), True)
])
spark = SparkSession.builder.appName("testing").getOrCreate()
spark.sparkContext.setLogLevel('WARN')
truck_data = spark.readStream.format("kafka")\
.option("kafka.bootstrap.servers", KAFKA_BROKER)\
.option("subscribe", KAFKA_INPUT_TOPIC)\
.option("startingOffsets", "latest") \
.load()
truck_data = truck_data.withColumn("value", col("value").cast(StringType()))\
.withColumn("jsonData", from_json(col("value"), schema)) \
.select("JsonData.*")\
def f(pdf):
print("inside applyinpandas")
pdf = pdf.sort_values(by='date')
pdf = pdf.assign(x2=pdf.x1.shift(1))
pdf = pdf.assign(y2=pdf.y1.shift(1))
pdf = pdf.assign(dis=np.sqrt((pdf.x1 - pdf.x2)**2 + (pdf.x1 - pdf.x2)**2))
pdf = pdf.fillna(0)
return pdf
truck_data = truck_data.withWatermark("date", "1 minute").\
.groupBy(window("date", "5 minute"), "id")\
.applyInPandas(f, schema=newschema)
query = truck_data\
# .writeStream\
# .format("console") \
# .option("numRows", 300)\
# .option("truncate", False)\
# .start().awaitTermination()
Sample input data
{"date":"2022-03-23 09:04:32.242637","id":"B","x1":3,"y1":3}
{"date":"2022-03-23 09:04:32.242737","id":"A","x1":2,"y1":2}
{"date":"2022-03-23 09:04:29.242737","id":"A","x1":1,"y1":1}
{"date":"2022-03-23 09:04:55.242737","id":"A","x1":6,"y1":6}
{"date":"2022-03-23 09:04:40.242737","id":"B","x1":7,"y1":7}
{"date":"2022-03-23 09:04:29.242737","id":"B","x1":1,"y1":1}
{"date":"2022-03-23 09:04:44.242737","id":"A","x1":5,"y1":5}
{"date":"2022-03-23 09:04:35.242737","id":"B","x1":5,"y1":5}
{"date":"2022-03-23 09:04:35.242737","id":"A","x1":3,"y1":3}
{"date":"2022-03-23 09:04:40.242737","id":"A","x1":4,"y1":4}
{"date":"2022-03-23 09:04:44.242737","id":"B","x1":9,"y1":9}
{"date":"2022-03-23 09:04:55.242737","id":"B","x1":11,"y1":11}
{"date":"2022-03-23 09:06:55.242737","id":"B","x1":11,"y1":11}
Output required
output image from pycharm
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论