从具有不同值的第一个前导行获取值

发布于 2025-01-18 17:00:12 字数 2019 浏览 4 评论 0原文

我有一个 id 列表、消息序列号 (seq) 和一个值(例如时间戳)。多行可以具有相同的序列号。每行还有一些其他具有不同值的列,但我排除了它们,因为它们并不重要。

在来自 deviceId (=partitionBy) 的所有消息中,我需要按序列号 (=orderBy) 排序,并将具有不同序列号的下一条消息的“ts”值添加到当前的所有消息中序列号。

我到目前为止已经检索了下一行的值如果该行具有不同的序列号。但由于“具有不同序列号的下一行”可能距离很远的 x 行,因此我必须添加特定的 .when(condition, ...) 块前面有 x 行。

我想知道是否有更好的解决方案,无论具有不同序列号的下一行有多“远”,它都可以工作。我尝试了 .otherwise(lead(col("next_value"), 1),但由于我只是构建列,所以它不起作用。

我的代码和可重现的示例:

data = [
    (1, 1, "A"),
    (2, 1, "G"),
    (2, 2, "F"),
    (3, 1, "A"),
    (4, 1, "A"),
    (4, 2, "B"),
    (4, 3, "C"),
    (4, 3, "C"),
    (4, 3, "C"),
    (4, 4, "D")
]

df = spark.createDataFrame(data=data, schema=["id", "seq", "ts"])

df.printSchema()
df.show(10, False)


window = Window \
    .orderBy("id", "seq") \
    .partitionBy("id")
# I could potentially do this 100x if the next lead-value is 100 rows away, but I wonder if there isn't a better solution.
is_different_seq1 = lead(col("seq"), 1).over(window) != col("seq")
is_different_seq2 = lead(col("seq"), 2).over(window) != col("seq")

df = df.withColumn("lead_value",
                   when(is_different_seq1,
                        lead(col("ts"), 1).over(window)
                        )
                   .when(is_different_seq2,
                        lead(col("ts"), 2).over(window)
                    )

                   )

df.printSchema()
df.show(10, False)

理想输出id=4 的“next_value”列:

idseqtsnext_value
41AB
42BC
43CD
43CD
43CD
44D

I have a list of ids, a sequence number of messages (seq) and a value (e.g. timestamps). Multiple rows can have the same sequence number. There are some other columns with different values in every row, but I excluded them as they are not important.

Within all messages from a deviceId (=partitionBy), I need to sort by sequence_number (=orderBy) and add the 'ts'-value of the next message with a different sequence_number to all messages of the current sequence_number.

I got so far as to retrieve the value of the next row if that row has a different sequence number. But since the "next row with a different sequence number" could potentially be x rows far away, I would have to add specific .when(condition, ...) blocks for x rows ahead.

I was wondering if there was a better solution which works no matter how "far away" the next row with a different sequence number is. I tried a .otherwise(lead(col("next_value"), 1), but since I am just building the column, it doesn't work.

My Code & reproducible example:

data = [
    (1, 1, "A"),
    (2, 1, "G"),
    (2, 2, "F"),
    (3, 1, "A"),
    (4, 1, "A"),
    (4, 2, "B"),
    (4, 3, "C"),
    (4, 3, "C"),
    (4, 3, "C"),
    (4, 4, "D")
]

df = spark.createDataFrame(data=data, schema=["id", "seq", "ts"])

df.printSchema()
df.show(10, False)


window = Window \
    .orderBy("id", "seq") \
    .partitionBy("id")
# I could potentially do this 100x if the next lead-value is 100 rows away, but I wonder if there isn't a better solution.
is_different_seq1 = lead(col("seq"), 1).over(window) != col("seq")
is_different_seq2 = lead(col("seq"), 2).over(window) != col("seq")

df = df.withColumn("lead_value",
                   when(is_different_seq1,
                        lead(col("ts"), 1).over(window)
                        )
                   .when(is_different_seq2,
                        lead(col("ts"), 2).over(window)
                    )

                   )

df.printSchema()
df.show(10, False)

Ideal output in column "next_value" for id=4:

idseqtsnext_value
41AB
42BC
43CD
43CD
43CD
44DNull

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

残花月 2025-01-25 17:00:12

我还没有尝试过更复杂的情况,所以这可能仍然需要更多调整,但我认为你可以与 last 函数结合。

仅使用 lead 函数,结果如下。

idseqtsLead_value
41AB
42BC
43CC
43CC
43CD
44DNull

您想要将第 3 行和第 4 行的 lead_value 覆盖为“D” " 这是同一 id&seq 组中 lead_value 的最后一个值。

lead_window = (Window
    .partitionBy("deviceId")
    .orderBy("seq"))

last_window = (Window
    .partitionBy('deviceId', 'seq')
    .rowsBetween(0, Window.unboundedFollowing)) 

df = df.withColumn('next_value', F.last(
        F.lead(F.col('ts')).over(lead_window)
    ).over(last_window))

结果。

idseqtsnext_value
41AB
42BC
43CD
43CD
43CD
44D

I haven't tried the more complicated case, so this might still need more adjustment but I think you can combine with last function.

With just the lead function, it results in like this.

idseqtslead_value
41AB
42BC
43CC
43CC
43CD
44DNull

You want to overwrite the lead_value of 3rd and 4th rows to be "D" which is the last value of the lead_value in the same id&seq group.

lead_window = (Window
    .partitionBy("deviceId")
    .orderBy("seq"))

last_window = (Window
    .partitionBy('deviceId', 'seq')
    .rowsBetween(0, Window.unboundedFollowing)) 

df = df.withColumn('next_value', F.last(
        F.lead(F.col('ts')).over(lead_window)
    ).over(last_window))

Result.

idseqtsnext_value
41AB
42BC
43CD
43CD
43CD
44DNull
绅刃 2025-01-25 17:00:12

我找到了一个解决方案(但是速度非常慢),所以如果有人提出更好的解决方案,请添加您的答案!

我为每个“消息”获取一行具有不同的行,执行那里的 Lead(1) ,并将其连接回数据帧和其余列。

df_filtered = df.select("id", "seq", "ts").distinct()
df_filtered = df_filtered.withColumn("lead_value", lead(col("ts"), 1).over(window))
df = df.join(df_filtered, on=["id", "seq", "ts"])

I found a solution (horribly slow however), so if someone comes up with a better solution, please add your answer!

I get one row per "message" with a distinct, execute the lead(1) there, and join it back to the dataframe to the rest of the columns.

df_filtered = df.select("id", "seq", "ts").distinct()
df_filtered = df_filtered.withColumn("lead_value", lead(col("ts"), 1).over(window))
df = df.join(df_filtered, on=["id", "seq", "ts"])
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文