在 Spark 结构化流中计算滑动窗口中的多个聚合

发布于 2025-01-14 02:42:23 字数 498 浏览 1 评论 0原文

我有一个流源,它发送事件,其中每条记录由 3 个字段组成(CreationTime、FP、Detected) 这里,“FP”代表误报。 “FP”和“检测到”字段的值可以为 1 或 0。 我想通过滑动窗口计算以下值。 FPR1 = Count(FP) / Count(Detected) 和 FPR2 = Count(FP) / Count(窗口中的总记录)

我可以使用以下查询聚合 Count(FP)。我也想计算其他 2 个聚合。即 DetectedCount 和 TotalCount 并计算 FPR1 和 FPR2 并写入文件接收器。我该怎么做?提前致谢。

val aggDF = finaldata
  .withWatermark("CreatedTime", "2 minute")
  .groupBy(col("FP"),
    window(col("CreatedTime"), "5 minute", "1 minute"))
  .agg(sum("FP").alias("FPCount"))

I have a streaming source which sends events where every record consiste of 3 fields (CreationTime, FP, Detected)
Here, 'FP' stands for false positive. 'FP' and 'Detected' fields can have values 1 or 0.
I want to calculate the following values over a sliding window.
FPR1 = Count(FP) / Count(Detected) and FPR2 = Count(FP) / Count(Total records in window)

I am able to aggregate Count(FP) using following query. I want to count the other 2 aggregates as well. ie DetectedCount and TotalCount and calculate FPR1 and FPR2 and write to a file sink. How do I do this? Thanks in advance.

val aggDF = finaldata
  .withWatermark("CreatedTime", "2 minute")
  .groupBy(col("FP"),
    window(col("CreatedTime"), "5 minute", "1 minute"))
  .agg(sum("FP").alias("FPCount"))

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

櫻之舞 2025-01-21 02:42:23

终于想通了。我错误地使用了 groupby 。这是最终的查询。

    val aggDF = finaldata
  .withWatermark("CreatedTime", "2 minute")
  .groupBy(window(col("CreatedTime"), "5 minute", "1 minute"))
.agg(sum("FP").alias("FPCount"),sum("Detected").alias("DetectedCount"),sum("Count").alias("TotalCount"))
  .withColumn("FPR", col("FPCount")/col("DetectedCount"))
  .withColumn("FPR2", col("DetectedCount")/col("TotalCount"))

Figured it out finally. I was using groupby wrongly. here is the final query.

    val aggDF = finaldata
  .withWatermark("CreatedTime", "2 minute")
  .groupBy(window(col("CreatedTime"), "5 minute", "1 minute"))
.agg(sum("FP").alias("FPCount"),sum("Detected").alias("DetectedCount"),sum("Count").alias("TotalCount"))
  .withColumn("FPR", col("FPCount")/col("DetectedCount"))
  .withColumn("FPR2", col("DetectedCount")/col("TotalCount"))
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文