为什么我从 pySpark groupby 中通过窗口生成的列得到 0 个结果?
我正在 pySpark 中分析火车通道。我想知道为什么我从仅包含窗口生成的数字的列上的简单分组中没有得到结果。
我的管道代码如下。我简化为相关栏目。
df_messages = spark.read.table("xxx.some_table")
.withColumn('sourceTimeDT', f.from_unixtime(f.col('sourceTime')/1000, 'yyyy-MM-dd HH:mm:ss'))
.withColumn('ms', f.col('sourceTime') % 1000)
.withColumn("rank", f.row_number().over(w.Window.orderBy(f.col('partId').asc(),
f.col('move_startsYMDHMS').asc(),
f.col('ms').asc(),
f.col('msg_order').asc())))
location1_messages = (
df_messages.filter("partId IN ('BRFHB', 'BROUB', 'BRZAB', 'BRBEB')")
.orderBy("sourceTimeDT", "ms")
)
# processing of the messages in a few steps:
location1_messages_proc = (
location1_messages
# rank is a proxy in this case to order only on date time, not on location name as well:
.withColumn("location1_rank",
f.row_number().over(w.Window.orderBy(f.col('sourceTimeDT').asc(),
f.col('ms').asc())))
# we detect the start of a train passage by the change of the S1 status: (we'll consider all message between two S1 statusses as one potential train passage sequence.)
.withColumn('prev_message',
f.lead(f.col('message_value'), -1, 'NA').over(w.Window.orderBy(f.col('location1_rank'))))
# ... (detection of message sequences)
)
location1_messages_proc2 = (
location1_messages_proc.withColumn('next1_partId',
f.lead(f.col('partId'), 1, 'NA').over(w.Window.orderBy(f.col('location1_rank'))))
# ... extra logic for detecting sequences ...
)
location1_messages_proc3 = (
location1_messages_proc2.withColumn("is_passage_start",
f.when(f.col("no_longer_free_on_other_bridge")=="start", 1).otherwise(0))
# The train passage ID, on which we'll do a group by later:
.withColumn("passage_id",
f.sum("is_passage_start").over(w.Window.orderBy(f.col("location1_rank"))))
)
display(location1_messages_proc3)
显示以下内容:
partId | sourceTimeDT | ... | passage_id |
---|---|---|---|
... | ... | ... | ... |
BRBEB | 2022-03-17 16:24 | ... | 3724 |
BRBEB | ... | ... | 3724 |
BRBEB | 2022-03-17 16:28 | ... | 3724 |
... | ... | ... | ... |
但下面的 group by 代码没有返回结果(?):
train_passage_first_and_last_message_times = (
location1_messages_proc3.groupby("passage_id")
.agg(f.min(f.col("sourceTimeDT")).alias("first_message_time"),
f.max(f.col("sourceTimeDT")).alias("last_message_time"))
)
display(train_passage_first_and_last_message_times)
我会预计有 1 个结果行,first_message_time = 2022-03-17 16:24 和 last_message_time = 2022-03-17 16:28,但我得到 0 个结果。
有谁明白为什么这不会发生?
我确实得到了这样的结果:
display(location1_messages_proc3.filter("passage_id == 3724"))
但不是这样:
display(
location1_messages_proc3.filter("passage_id == 3724")
.groupby("passage_id")
.agg(f.min(f.col("sourceTimeDT")).alias("first_message_time"),
f.max(f.col("sourceTimeDT")).alias("last_message_time"))
)
所以这一定是由于分组或创建列passage_id的方式造成的。 (或者,作为最后一个选择,pySpark 中的一个错误。)
任何人都可以给我提示吗?
I'm analyzing train passages in pySpark. I'm wondering why I get no results from a simple group by over a column that just contains a number generated by a window.
My pipeline code is as follows. I simplified to the relevant columns.
df_messages = spark.read.table("xxx.some_table")
.withColumn('sourceTimeDT', f.from_unixtime(f.col('sourceTime')/1000, 'yyyy-MM-dd HH:mm:ss'))
.withColumn('ms', f.col('sourceTime') % 1000)
.withColumn("rank", f.row_number().over(w.Window.orderBy(f.col('partId').asc(),
f.col('move_startsYMDHMS').asc(),
f.col('ms').asc(),
f.col('msg_order').asc())))
location1_messages = (
df_messages.filter("partId IN ('BRFHB', 'BROUB', 'BRZAB', 'BRBEB')")
.orderBy("sourceTimeDT", "ms")
)
# processing of the messages in a few steps:
location1_messages_proc = (
location1_messages
# rank is a proxy in this case to order only on date time, not on location name as well:
.withColumn("location1_rank",
f.row_number().over(w.Window.orderBy(f.col('sourceTimeDT').asc(),
f.col('ms').asc())))
# we detect the start of a train passage by the change of the S1 status: (we'll consider all message between two S1 statusses as one potential train passage sequence.)
.withColumn('prev_message',
f.lead(f.col('message_value'), -1, 'NA').over(w.Window.orderBy(f.col('location1_rank'))))
# ... (detection of message sequences)
)
location1_messages_proc2 = (
location1_messages_proc.withColumn('next1_partId',
f.lead(f.col('partId'), 1, 'NA').over(w.Window.orderBy(f.col('location1_rank'))))
# ... extra logic for detecting sequences ...
)
location1_messages_proc3 = (
location1_messages_proc2.withColumn("is_passage_start",
f.when(f.col("no_longer_free_on_other_bridge")=="start", 1).otherwise(0))
# The train passage ID, on which we'll do a group by later:
.withColumn("passage_id",
f.sum("is_passage_start").over(w.Window.orderBy(f.col("location1_rank"))))
)
display(location1_messages_proc3)
shows the following:
partId | sourceTimeDT | ... | passage_id |
---|---|---|---|
... | ... | ... | ... |
BRBEB | 2022-03-17 16:24 | ... | 3724 |
BRBEB | ... | ... | 3724 |
BRBEB | 2022-03-17 16:28 | ... | 3724 |
... | ... | ... | ... |
But the following group by code returns no results (??):
train_passage_first_and_last_message_times = (
location1_messages_proc3.groupby("passage_id")
.agg(f.min(f.col("sourceTimeDT")).alias("first_message_time"),
f.max(f.col("sourceTimeDT")).alias("last_message_time"))
)
display(train_passage_first_and_last_message_times)
I'd expect 1 result line, with first_message_time = 2022-03-17 16:24 and last_message_time = 2022-03-17 16:28, but I get 0 results back.
Does anyone understand why this doesn't happen?
I do get results with this though:
display(location1_messages_proc3.filter("passage_id == 3724"))
But not with this:
display(
location1_messages_proc3.filter("passage_id == 3724")
.groupby("passage_id")
.agg(f.min(f.col("sourceTimeDT")).alias("first_message_time"),
f.max(f.col("sourceTimeDT")).alias("last_message_time"))
)
So it must be due to the grouping or the way the column passage_id has been created.
(Or, as last option, a bug in pySpark.)
Can anyone give me a hint?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论