为什么我从 pySpark groupby 中通过窗口生成的列得到 0 个结果?

发布于 2025-01-14 22:45:26 字数 3835 浏览 1 评论 0原文

我正在 pySpark 中分析火车通道。我想知道为什么我从仅包含窗口生成的数字的列上的简单分组中没有得到结果。

我的管道代码如下。我简化为相关栏目。

df_messages = spark.read.table("xxx.some_table")
  .withColumn('sourceTimeDT', f.from_unixtime(f.col('sourceTime')/1000, 'yyyy-MM-dd HH:mm:ss'))
  .withColumn('ms', f.col('sourceTime') % 1000)
  .withColumn("rank", f.row_number().over(w.Window.orderBy(f.col('partId').asc(),
                                                           f.col('move_startsYMDHMS').asc(), 
                                                           f.col('ms').asc(),
                                                           f.col('msg_order').asc())))

location1_messages = (
  df_messages.filter("partId IN ('BRFHB', 'BROUB', 'BRZAB', 'BRBEB')")
             .orderBy("sourceTimeDT", "ms")
)

# processing of the messages in a few steps:

location1_messages_proc = (
  location1_messages
     # rank is a proxy in this case to order only on date time, not on location name as well:
    .withColumn("location1_rank", 
                f.row_number().over(w.Window.orderBy(f.col('sourceTimeDT').asc(),
                                                     f.col('ms').asc())))
    # we detect the start of a train passage by the change of the S1 status: (we'll consider all message between two S1 statusses as one potential train passage sequence.)
    .withColumn('prev_message', 
                f.lead(f.col('message_value'), -1, 'NA').over(w.Window.orderBy(f.col('location1_rank'))))
    # ... (detection of message sequences)
)

location1_messages_proc2 = (
  location1_messages_proc.withColumn('next1_partId', 
                                     f.lead(f.col('partId'), 1, 'NA').over(w.Window.orderBy(f.col('location1_rank'))))
  # ... extra logic for detecting sequences ...
)

location1_messages_proc3 = (
  location1_messages_proc2.withColumn("is_passage_start", 
                                  f.when(f.col("no_longer_free_on_other_bridge")=="start", 1).otherwise(0))
                      # The train passage ID, on which we'll do a group by later:
                      .withColumn("passage_id", 
                                  f.sum("is_passage_start").over(w.Window.orderBy(f.col("location1_rank"))))
)

display(location1_messages_proc3) 显示以下内容:

partIdsourceTimeDT...passage_id
............
BRBEB2022-03-17 16:24...3724
BRBEB......3724
BRBEB2022-03-17 16:28...3724
............

但下面的 group by 代码没有返回结果(?):

train_passage_first_and_last_message_times = (
  location1_messages_proc3.groupby("passage_id")
                      .agg(f.min(f.col("sourceTimeDT")).alias("first_message_time"), 
                           f.max(f.col("sourceTimeDT")).alias("last_message_time"))
)
display(train_passage_first_and_last_message_times)

我会预计有 1 个结果行,first_message_time = 2022-03-17 16:24 和 last_message_time = 2022-03-17 16:28,但我得到 0 个结果。

有谁明白为什么这不会发生?

确实得到了这样的结果:

display(location1_messages_proc3.filter("passage_id == 3724"))

但不是这样:

display(
  location1_messages_proc3.filter("passage_id == 3724")
                    .groupby("passage_id")
                    .agg(f.min(f.col("sourceTimeDT")).alias("first_message_time"), 
                         f.max(f.col("sourceTimeDT")).alias("last_message_time"))
)

所以这一定是由于分组或创建列passage_id的方式造成的。 (或者,作为最后一个选择,pySpark 中的一个错误。)

任何人都可以给我提示吗?

I'm analyzing train passages in pySpark. I'm wondering why I get no results from a simple group by over a column that just contains a number generated by a window.

My pipeline code is as follows. I simplified to the relevant columns.

df_messages = spark.read.table("xxx.some_table")
  .withColumn('sourceTimeDT', f.from_unixtime(f.col('sourceTime')/1000, 'yyyy-MM-dd HH:mm:ss'))
  .withColumn('ms', f.col('sourceTime') % 1000)
  .withColumn("rank", f.row_number().over(w.Window.orderBy(f.col('partId').asc(),
                                                           f.col('move_startsYMDHMS').asc(), 
                                                           f.col('ms').asc(),
                                                           f.col('msg_order').asc())))

location1_messages = (
  df_messages.filter("partId IN ('BRFHB', 'BROUB', 'BRZAB', 'BRBEB')")
             .orderBy("sourceTimeDT", "ms")
)

# processing of the messages in a few steps:

location1_messages_proc = (
  location1_messages
     # rank is a proxy in this case to order only on date time, not on location name as well:
    .withColumn("location1_rank", 
                f.row_number().over(w.Window.orderBy(f.col('sourceTimeDT').asc(),
                                                     f.col('ms').asc())))
    # we detect the start of a train passage by the change of the S1 status: (we'll consider all message between two S1 statusses as one potential train passage sequence.)
    .withColumn('prev_message', 
                f.lead(f.col('message_value'), -1, 'NA').over(w.Window.orderBy(f.col('location1_rank'))))
    # ... (detection of message sequences)
)

location1_messages_proc2 = (
  location1_messages_proc.withColumn('next1_partId', 
                                     f.lead(f.col('partId'), 1, 'NA').over(w.Window.orderBy(f.col('location1_rank'))))
  # ... extra logic for detecting sequences ...
)

location1_messages_proc3 = (
  location1_messages_proc2.withColumn("is_passage_start", 
                                  f.when(f.col("no_longer_free_on_other_bridge")=="start", 1).otherwise(0))
                      # The train passage ID, on which we'll do a group by later:
                      .withColumn("passage_id", 
                                  f.sum("is_passage_start").over(w.Window.orderBy(f.col("location1_rank"))))
)

display(location1_messages_proc3) shows the following:

partIdsourceTimeDT...passage_id
............
BRBEB2022-03-17 16:24...3724
BRBEB......3724
BRBEB2022-03-17 16:28...3724
............

But the following group by code returns no results (??):

train_passage_first_and_last_message_times = (
  location1_messages_proc3.groupby("passage_id")
                      .agg(f.min(f.col("sourceTimeDT")).alias("first_message_time"), 
                           f.max(f.col("sourceTimeDT")).alias("last_message_time"))
)
display(train_passage_first_and_last_message_times)

I'd expect 1 result line, with first_message_time = 2022-03-17 16:24 and last_message_time = 2022-03-17 16:28, but I get 0 results back.

Does anyone understand why this doesn't happen?

I do get results with this though:

display(location1_messages_proc3.filter("passage_id == 3724"))

But not with this:

display(
  location1_messages_proc3.filter("passage_id == 3724")
                    .groupby("passage_id")
                    .agg(f.min(f.col("sourceTimeDT")).alias("first_message_time"), 
                         f.max(f.col("sourceTimeDT")).alias("last_message_time"))
)

So it must be due to the grouping or the way the column passage_id has been created.
(Or, as last option, a bug in pySpark.)

Can anyone give me a hint?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文