从具有不同值的第一个前导行获取值
我有一个 id 列表、消息序列号 (seq) 和一个值(例如时间戳)。多行可以具有相同的序列号。每行还有一些其他具有不同值的列,但我排除了它们,因为它们并不重要。
在来自 deviceId (=partitionBy) 的所有消息中,我需要按序列号 (=orderBy) 排序,并将具有不同序列号的下一条消息的“ts”值添加到当前的所有消息中序列号。
我到目前为止已经检索了下一行的值如果该行具有不同的序列号。但由于“具有不同序列号的下一行”可能距离很远的 x
行,因此我必须添加特定的 .when(condition, ...)
块前面有 x
行。
我想知道是否有更好的解决方案,无论具有不同序列号的下一行有多“远”,它都可以工作。我尝试了 .otherwise(lead(col("next_value"), 1)
,但由于我只是构建列,所以它不起作用。
我的代码和可重现的示例:
data = [
(1, 1, "A"),
(2, 1, "G"),
(2, 2, "F"),
(3, 1, "A"),
(4, 1, "A"),
(4, 2, "B"),
(4, 3, "C"),
(4, 3, "C"),
(4, 3, "C"),
(4, 4, "D")
]
df = spark.createDataFrame(data=data, schema=["id", "seq", "ts"])
df.printSchema()
df.show(10, False)
window = Window \
.orderBy("id", "seq") \
.partitionBy("id")
# I could potentially do this 100x if the next lead-value is 100 rows away, but I wonder if there isn't a better solution.
is_different_seq1 = lead(col("seq"), 1).over(window) != col("seq")
is_different_seq2 = lead(col("seq"), 2).over(window) != col("seq")
df = df.withColumn("lead_value",
when(is_different_seq1,
lead(col("ts"), 1).over(window)
)
.when(is_different_seq2,
lead(col("ts"), 2).over(window)
)
)
df.printSchema()
df.show(10, False)
理想输出id=4
的“next_value”列:
id | seq | ts | next_value |
---|---|---|---|
4 | 1 | A | B |
4 | 2 | B | C |
4 | 3 | C | D |
4 | 3 | C | D |
4 | 3 | C | D |
4 | 4 | D | 空 |
I have a list of ids, a sequence number of messages (seq) and a value (e.g. timestamps). Multiple rows can have the same sequence number. There are some other columns with different values in every row, but I excluded them as they are not important.
Within all messages from a deviceId (=partitionBy), I need to sort by sequence_number (=orderBy) and add the 'ts'-value of the next message with a different sequence_number to all messages of the current sequence_number.
I got so far as to retrieve the value of the next row if that row has a different sequence number. But since the "next row with a different sequence number" could potentially be x
rows far away, I would have to add specific .when(condition, ...)
blocks for x
rows ahead.
I was wondering if there was a better solution which works no matter how "far away" the next row with a different sequence number is. I tried a .otherwise(lead(col("next_value"), 1)
, but since I am just building the column, it doesn't work.
My Code & reproducible example:
data = [
(1, 1, "A"),
(2, 1, "G"),
(2, 2, "F"),
(3, 1, "A"),
(4, 1, "A"),
(4, 2, "B"),
(4, 3, "C"),
(4, 3, "C"),
(4, 3, "C"),
(4, 4, "D")
]
df = spark.createDataFrame(data=data, schema=["id", "seq", "ts"])
df.printSchema()
df.show(10, False)
window = Window \
.orderBy("id", "seq") \
.partitionBy("id")
# I could potentially do this 100x if the next lead-value is 100 rows away, but I wonder if there isn't a better solution.
is_different_seq1 = lead(col("seq"), 1).over(window) != col("seq")
is_different_seq2 = lead(col("seq"), 2).over(window) != col("seq")
df = df.withColumn("lead_value",
when(is_different_seq1,
lead(col("ts"), 1).over(window)
)
.when(is_different_seq2,
lead(col("ts"), 2).over(window)
)
)
df.printSchema()
df.show(10, False)
Ideal output in column "next_value" for id=4
:
id | seq | ts | next_value |
---|---|---|---|
4 | 1 | A | B |
4 | 2 | B | C |
4 | 3 | C | D |
4 | 3 | C | D |
4 | 3 | C | D |
4 | 4 | D | Null |
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
我还没有尝试过更复杂的情况,所以这可能仍然需要更多调整,但我认为你可以与
last
函数结合。仅使用
lead
函数,结果如下。您想要将第 3 行和第 4 行的
lead_value
覆盖为“D” " 这是同一id
&seq
组中lead_value
的最后一个值。结果。
I haven't tried the more complicated case, so this might still need more adjustment but I think you can combine with
last
function.With just the
lead
function, it results in like this.You want to overwrite the
lead_value
of 3rd and 4th rows to be "D" which is the last value of thelead_value
in the sameid
&seq
group.Result.
我找到了一个解决方案(但是速度非常慢),所以如果有人提出更好的解决方案,请添加您的答案!
我为每个“消息”获取一行具有不同的行,执行那里的 Lead(1) ,并将其连接回数据帧和其余列。
I found a solution (horribly slow however), so if someone comes up with a better solution, please add your answer!
I get one row per "message" with a distinct, execute the lead(1) there, and join it back to the dataframe to the rest of the columns.