使用Python的Spark流:如何添加UUID列?

发布于 2025-02-04 09:50:04 字数 328 浏览 2 评论 0原文

我想在我的数据框架中添加带有生成ID的列。我尝试过:

uuidUdf = udf(lambda x: str(uuid.uuid4()), StringType())
df = df.withColumn("id", uuidUdf())

但是,当我这样做时,我的输出目录没有任何写作。当我删除这些行时,一切都可以正常工作,因此必须有一些错误,但我在控制台中没有看到任何内容。

我尝试使用单调的_increasing_id()而不是生成uuid,但是在我的测试中,这会产生许多重复。我需要一个唯一的标识符(不必具体是UUID)。

我该怎么做?

I would like to add a column with a generated id to my data frame. I have tried:

uuidUdf = udf(lambda x: str(uuid.uuid4()), StringType())
df = df.withColumn("id", uuidUdf())

however, when I do this, nothing is written to my output directory. When I remove these lines, everything works fine so there must be some error but I don't see anything in the console.

I have tried using monotonically_increasing_id() instead of generating a UUID but in my testing, this produces many duplicates. I need a unique identifier (does not have to be a UUID specifically).

How can I do this?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

巴黎夜雨 2025-02-11 09:50:04

一种简单的方法:

from pyspark.sql import SparkSession
import pyspark.sql.functions as f

spark = SparkSession.builder.getOrCreate()
df = spark.range(10)
df.withColumn("uuid", f.expr("uuid()")).show(truncate=False)

A simple way:

from pyspark.sql import SparkSession
import pyspark.sql.functions as f

spark = SparkSession.builder.getOrCreate()
df = spark.range(10)
df.withColumn("uuid", f.expr("uuid()")).show(truncate=False)
玩世 2025-02-11 09:50:04

请尝试以下操作:

import uuid
from pyspark.sql.functions import udf

uuidUdf= udf(lambda : str(uuid.uuid4()),StringType())
Df1 = Df.withColumn("id",uuidUdf())

注意:添加新列后,您应该分配给新的DF。 ( df1 = df.withcolumn(....)

Please Try this:

import uuid
from pyspark.sql.functions import udf

uuidUdf= udf(lambda : str(uuid.uuid4()),StringType())
Df1 = Df.withColumn("id",uuidUdf())

Note: You should assign to new DF after adding new column. (Df1 = Df.withColumn(....)

扮仙女 2025-02-11 09:50:04

来自pyspark's function.py

注意:默认情况下,用户定义的功能被视为确定性。由于优化,可以消除重复的调用,或者可能比查询中存在的功能更多的次数。如果您的功能不是确定性的,请在用户定义的函数上调用asnonderministic。例如:

from pyspark.sql.types import IntegerType
import random
random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic()

因此,对于uuid来说,这将是:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import uuid
random_udf = udf(lambda: str(uuid.uuid4()), StringType()).asNondeterministic()

用法:

df = df.withColumn('id', random_udf())

From pyspark's functions.py:

note: The user-defined functions are considered deterministic by default. Due to optimization, duplicate invocations may be eliminated or the function may even be invoked more times than it is present in the query. If your function is not deterministic, call asNondeterministic on the user defined function. E.g.:

from pyspark.sql.types import IntegerType
import random
random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic()

So for a UUID this would be:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import uuid
random_udf = udf(lambda: str(uuid.uuid4()), StringType()).asNondeterministic()

and the usage:

df = df.withColumn('id', random_udf())
粉红×色少女 2025-02-11 09:50:04

请使用lit函数,以便为所有记录生成相同的ID。
lit仅执行一次功能,并获取列值并将其添加到每个记录中。

>>> df.show(truncate=False)
+---+
|x  |
+---+
|0  |
|1  |
|2  |
|3  |
|4  |
|5  |
|6  |
|7  |
|8  |
|9  |
+---+
>>> import uuid
>>> id = str(uuid.uuid4())
>>> df = df.withColumn("id", lit(id))
>>> df.show(truncate=False)
+---+------------------------------------+
|x  |id                                  |
+---+------------------------------------+
|0  |923b69d6-4bee-423d-a892-79162df5684d|
|1  |923b69d6-4bee-423d-a892-79162df5684d|
|2  |923b69d6-4bee-423d-a892-79162df5684d|
|3  |923b69d6-4bee-423d-a892-79162df5684d|
|4  |923b69d6-4bee-423d-a892-79162df5684d|
|5  |923b69d6-4bee-423d-a892-79162df5684d|
|6  |923b69d6-4bee-423d-a892-79162df5684d|
|7  |923b69d6-4bee-423d-a892-79162df5684d|
|8  |923b69d6-4bee-423d-a892-79162df5684d|
|9  |923b69d6-4bee-423d-a892-79162df5684d|
+---+------------------------------------+

使用UDF不会在每行调用函数时求解该函数,并且我们最终会为每个调用获得新的UUID。

>>> df1 = df.withColumn("id",uuidUdf())
>>> uuidUdf= udf(lambda : str(uuid.uuid4()),StringType())
>>> df1 = df.withColumn("id",uuidUdf())
>>> df1.show(truncate=False)
+---+------------------------------------+
|x  |id                                  |
+---+------------------------------------+
|0  |6d051ec6-b91a-4c42-b37c-707a293f1dc8|
|1  |cd3c75b1-8a06-461b-82ae-51f4354296bd|
|2  |3996a022-de99-4403-9346-74e66210f9ef|
|3  |ad57a9c4-5c67-4545-bef6-77d89cff70d5|
|4  |5c9a82a1-323e-4ce0-9082-e36c5a6f61db|
|5  |7a64ee81-4c84-43d0-ab7d-0a79ed694950|
|6  |a0fb26e7-cf1a-445d-bd26-10dc453ddc1e|
|7  |435a7e6a-da22-4add-8953-b5c56b01c790|
|8  |fd3c5fd8-c9d5-4725-b32a-f3ce9386b9b8|
|9  |2291cc67-47cf-4921-80ec-b4180c73533c|
+---+------------------------------------+

Please use lit function so that you generate same id for all the records.
lit performs the function only once and gets the column value and adds it to every record.

>>> df.show(truncate=False)
+---+
|x  |
+---+
|0  |
|1  |
|2  |
|3  |
|4  |
|5  |
|6  |
|7  |
|8  |
|9  |
+---+
>>> import uuid
>>> id = str(uuid.uuid4())
>>> df = df.withColumn("id", lit(id))
>>> df.show(truncate=False)
+---+------------------------------------+
|x  |id                                  |
+---+------------------------------------+
|0  |923b69d6-4bee-423d-a892-79162df5684d|
|1  |923b69d6-4bee-423d-a892-79162df5684d|
|2  |923b69d6-4bee-423d-a892-79162df5684d|
|3  |923b69d6-4bee-423d-a892-79162df5684d|
|4  |923b69d6-4bee-423d-a892-79162df5684d|
|5  |923b69d6-4bee-423d-a892-79162df5684d|
|6  |923b69d6-4bee-423d-a892-79162df5684d|
|7  |923b69d6-4bee-423d-a892-79162df5684d|
|8  |923b69d6-4bee-423d-a892-79162df5684d|
|9  |923b69d6-4bee-423d-a892-79162df5684d|
+---+------------------------------------+

Using udf won't solve the function as it gets called for every row, and we end up getting new uuid's for each call.

>>> df1 = df.withColumn("id",uuidUdf())
>>> uuidUdf= udf(lambda : str(uuid.uuid4()),StringType())
>>> df1 = df.withColumn("id",uuidUdf())
>>> df1.show(truncate=False)
+---+------------------------------------+
|x  |id                                  |
+---+------------------------------------+
|0  |6d051ec6-b91a-4c42-b37c-707a293f1dc8|
|1  |cd3c75b1-8a06-461b-82ae-51f4354296bd|
|2  |3996a022-de99-4403-9346-74e66210f9ef|
|3  |ad57a9c4-5c67-4545-bef6-77d89cff70d5|
|4  |5c9a82a1-323e-4ce0-9082-e36c5a6f61db|
|5  |7a64ee81-4c84-43d0-ab7d-0a79ed694950|
|6  |a0fb26e7-cf1a-445d-bd26-10dc453ddc1e|
|7  |435a7e6a-da22-4add-8953-b5c56b01c790|
|8  |fd3c5fd8-c9d5-4725-b32a-f3ce9386b9b8|
|9  |2291cc67-47cf-4921-80ec-b4180c73533c|
+---+------------------------------------+
岁吢 2025-02-11 09:50:04

我正在使用pyspark =“ == 3.2.1”,您可以添加uuid 版本很像以下

import uuid
from pyspark.sql import functions as f

df.withColumn("uuid", f.lit(str(uuid.uuid4())))

更新

,似乎我最终使用了UDF函数

import uuid
from pyspark.sql import functions as f

# convert function to UDF
uuidUdf = f.udf(lambda: str(uuid.uuid4()), StringType())

df.withColumn("uuid", uuidUdf())

I'm using pyspark= "==3.2.1", you can add your uuid version easly like the following

import uuid
from pyspark.sql import functions as f

df.withColumn("uuid", f.lit(str(uuid.uuid4())))

Update

It seems that I ended up using an UDF function

import uuid
from pyspark.sql import functions as f

# convert function to UDF
uuidUdf = f.udf(lambda: str(uuid.uuid4()), StringType())

df.withColumn("uuid", uuidUdf())

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文