如何在pyspark中使用未结合的,未结合的folodingfollow和Currentrow

发布于 2025-02-07 15:00:15 字数 485 浏览 0 评论 0原文

我对方法有点困惑 pyspark.sql.window.rowsbetween 接受window.unboundedprecedingwindow> window> window.unboundedfollowing窗口.currentrow对象作为startend参数。您能否解释该功能的工作原理以及如何正确使用window对象,并在一些示例中正确使用?谢谢你!

I am a little confused about the method pyspark.sql.Window.rowsBetween that accepts Window.unboundedPreceding, Window.unboundedFollowing, and Window.currentRow objects as start and end arguments. Could you please explain how the function works and how to use Window objects correctly, with some examples? Thank you!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

梦明 2025-02-14 15:00:16

名称之间的行之间/范围之间的行建议有助于限制窗口内考虑的行数。

让我们举一个简单的例子。

从数据开始:

dfw = (
    spark
    .createDataFrame(
        [
            ("abc", 1, 100),
            ("abc", 2, 200),
            ("abc", 3, 300),
            ("abc", 4, 200),
            ("abc", 5, 100),
        ],
        "name string,id int,price int",
    )
)
# output
+----+---+-----+
|name| id|price|
+----+---+-----+
| abc|  1|  100|
| abc|  2|  200|
| abc|  3|  300|
| abc|  4|  200|
| abc|  5|  100|
+----+---+-----+

现在通过此数据,让我们尝试查找每行运行最大Max的运行最大最大值:

(
    dfw
    .withColumn(
        "rm",
        F.max("price").over(Window.partitionBy("name").orderBy("id"))
    )
    .show()
)

#output
+----+---+-----+---+
|name| id|price| rm|
+----+---+-----+---+
| abc|  1|  100|100|
| abc|  2|  200|200|
| abc|  3|  300|300|
| abc|  4|  200|300|
| abc|  5|  100|300|
+----+---+-----+---+

因此,如预期的那样,它将每个价格从上到下查看每个价格,并填充了最大值。它得到的行为被称为<代码> start = window.unboundedpreceding to end = end = window.currentrow

现在将行之间更改为start = start = window.unboundedpreceding to eend = end = window.unboundedfollowing我们将获得如下:

(
    dfw
    .withColumn(
        "rm",
        F.max("price").over(
            Window
            .partitionBy("name")
            .orderBy("id")
            .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
        )
    )
    .show()
)

#output
+----+---+-----+---+
|name| id|price| rm|
+----+---+-----+---+
| abc|  1|  100|300|
| abc|  2|  200|300|
| abc|  3|  300|300|
| abc|  4|  200|300|
| abc|  5|  100|300|
+----+---+-----+---+

现在,如您在同一窗口中所看到的那样,它在所有值中向下看,而不是将其限制为当前行。

现在,第三个将是start = window.currentrowend = window.unboundedfollowing

(
    dfw
    .withColumn(
        "rm",
        F.max("price").over(
            Window
            .partitionBy("name")
            .orderBy("id")
            .rowsBetween(Window.currentRow, Window.unboundedFollowing)
        )
    )
    .show()
)

#output
+----+---+-----+---+
|name| id|price| rm|
+----+---+-----+---+
| abc|  1|  100|300|
| abc|  2|  200|300|
| abc|  3|  300|300|
| abc|  4|  200|200|
| abc|  5|  100|100|
+----+---+-----+---+

现在,它仅查看最大一个从当前的启动其行。

另外,它不仅限于要使用的3个要使用的3个上方或下方的所有值都将仅查看上方的1行,下方1行。

这样:

(
    dfw
    .withColumn(
        "rm",
        F.max("price").over(
            Window
            .partitionBy("name")
            .orderBy("id")
            .rowsBetween(Window.currentRow-1, Window.currentRow+1)
        )
    )
    .show()
)

# output
+----+---+-----+---+
|name| id|price| rm|
+----+---+-----+---+
| abc|  1|  100|200|
| abc|  2|  200|300|
| abc|  3|  300|300|
| abc|  4|  200|300|
| abc|  5|  100|200|
+----+---+-----+---+

因此,您可以想象它在窗口内部的窗口围绕当前行进行处理。

Rows between/Range between as the name suggests help with limiting the number of rows considered inside a window.

Let us take a simple example.

Starting with data:

dfw = (
    spark
    .createDataFrame(
        [
            ("abc", 1, 100),
            ("abc", 2, 200),
            ("abc", 3, 300),
            ("abc", 4, 200),
            ("abc", 5, 100),
        ],
        "name string,id int,price int",
    )
)
# output
+----+---+-----+
|name| id|price|
+----+---+-----+
| abc|  1|  100|
| abc|  2|  200|
| abc|  3|  300|
| abc|  4|  200|
| abc|  5|  100|
+----+---+-----+

Now over this data let's try to find of running max i.e max for each row:

(
    dfw
    .withColumn(
        "rm",
        F.max("price").over(Window.partitionBy("name").orderBy("id"))
    )
    .show()
)

#output
+----+---+-----+---+
|name| id|price| rm|
+----+---+-----+---+
| abc|  1|  100|100|
| abc|  2|  200|200|
| abc|  3|  300|300|
| abc|  4|  200|300|
| abc|  5|  100|300|
+----+---+-----+---+

So as expected it looked at each price from top to bottom one by one and populated the max value it got this behaviour is known as start = Window.unboundedPreceding to end = Window.currentRow

Now changing rows between values to start = Window.unboundedPreceding to end = Window.unboundedFollowing we will get as below:

(
    dfw
    .withColumn(
        "rm",
        F.max("price").over(
            Window
            .partitionBy("name")
            .orderBy("id")
            .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
        )
    )
    .show()
)

#output
+----+---+-----+---+
|name| id|price| rm|
+----+---+-----+---+
| abc|  1|  100|300|
| abc|  2|  200|300|
| abc|  3|  300|300|
| abc|  4|  200|300|
| abc|  5|  100|300|
+----+---+-----+---+

Now as you can see in the same window it's looking downwards in all values for a max instead of limiting it to the current row.

Now third will be start = Window.currentRow and end = Window.unboundedFollowing

(
    dfw
    .withColumn(
        "rm",
        F.max("price").over(
            Window
            .partitionBy("name")
            .orderBy("id")
            .rowsBetween(Window.currentRow, Window.unboundedFollowing)
        )
    )
    .show()
)

#output
+----+---+-----+---+
|name| id|price| rm|
+----+---+-----+---+
| abc|  1|  100|300|
| abc|  2|  200|300|
| abc|  3|  300|300|
| abc|  4|  200|200|
| abc|  5|  100|100|
+----+---+-----+---+

Now it's looking down only for a max starting its row from the current one.

Also, it's not limited to just these 3 to use as is you can even start = Window.currentRow-1 and end = Window.currentRow+1 so instead of looking for all values above or below it will only look at 1 row above and 1 row below.

like this:

(
    dfw
    .withColumn(
        "rm",
        F.max("price").over(
            Window
            .partitionBy("name")
            .orderBy("id")
            .rowsBetween(Window.currentRow-1, Window.currentRow+1)
        )
    )
    .show()
)

# output
+----+---+-----+---+
|name| id|price| rm|
+----+---+-----+---+
| abc|  1|  100|200|
| abc|  2|  200|300|
| abc|  3|  300|300|
| abc|  4|  200|300|
| abc|  5|  100|200|
+----+---+-----+---+

So you can imagine it a window inside the window which works around the current row it's processing.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文