PySpark UDF:fir 变换示例

发布于 2025-01-15 16:40:35 字数 1397 浏览 3 评论 0原文

我对 PySpark 很陌生,正在尝试将一些 python 代码翻译成 pyspark。 我从 panda 开始,转换为文档项矩阵,然后应用 PCA。

UDF:


    class MultiLabelCounter():
        def __init__(self, classes=None):
            self.classes_ = classes

        def fit(self,y):
            self.classes_ = 
    sorted(set(itertools.chain.from_iterable(y)))
            self.mapping = dict(zip(self.classes_,
                                         
    range(len(self.classes_))))
            return self

    def transform(self,y):
        yt = []
        for labels in y:
            data = [0]*len(self.classes_)
            for label in labels:
                data[self.mapping[label]] +=1
            yt.append(data)
        return yt

    def fit_transform(self,y):
        return self.fit(y).transform(y)

    mlb = MultiLabelCounter()
    df_grouped = 
    df_grouped.withColumnRenamed("collect_list(full)","full")

    udf_mlb = udf(lambda x: mlb.fit_transform(x),IntegerType())

    mlb_fitted = df_grouped.withColumn('full',udf_mlb(col("full")))

我当然得到了 NULL 结果。

我使用的是spark 2.4.4版本。

编辑

根据请求添加示例输入和输出

输入:

|id|val|
|--|---|
|1|[hello,world]|
|2|[goodbye, world]|
|3|[hello,hello]|

输出:

|id|hello|goodbye|world|
|--|-----|-------|-----|
|1|1|0|1|
|2|0|1|1|
|3|2|0|0|

I am really new to PySpark and am trying to translate some python code into pyspark.
I start with a panda, convert to a document - term matrix and then apply PCA.

The UDF:


    class MultiLabelCounter():
        def __init__(self, classes=None):
            self.classes_ = classes

        def fit(self,y):
            self.classes_ = 
    sorted(set(itertools.chain.from_iterable(y)))
            self.mapping = dict(zip(self.classes_,
                                         
    range(len(self.classes_))))
            return self

    def transform(self,y):
        yt = []
        for labels in y:
            data = [0]*len(self.classes_)
            for label in labels:
                data[self.mapping[label]] +=1
            yt.append(data)
        return yt

    def fit_transform(self,y):
        return self.fit(y).transform(y)

    mlb = MultiLabelCounter()
    df_grouped = 
    df_grouped.withColumnRenamed("collect_list(full)","full")

    udf_mlb = udf(lambda x: mlb.fit_transform(x),IntegerType())

    mlb_fitted = df_grouped.withColumn('full',udf_mlb(col("full")))

I am of course getting NULL results.

I am using spark 2.4.4 version.

EDIT

Adding sample input and output as per request

Input:

|id|val|
|--|---|
|1|[hello,world]|
|2|[goodbye, world]|
|3|[hello,hello]|

Output:

|id|hello|goodbye|world|
|--|-----|-------|-----|
|1|1|0|1|
|2|0|1|1|
|3|2|0|0|

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

萌梦深 2025-01-22 16:40:35

根据共享的输入数据,我尝试复制您的输出并且它有效。请参阅下文 -

输入数据

df = spark.createDataFrame(data=[(1, ['hello', 'world']), (2, ['goodbye', 'world']), (3, ['hello', 'hello'])], schema=['id', 'vals'])

df.show()

+---+----------------+
| id|            vals|
+---+----------------+
|  1|  [hello, world]|
|  2|[goodbye, world]|
|  3|  [hello, hello]|
+---+----------------+

现在,使用 explodevals 列表项创建单独的行。此后,使用pivotcount将计算频率。最后,使用 fillna(0)null 值替换为 0。见下文 -

from pyspark.sql.functions import *

df1 = df.select(['id', explode(col('vals'))]).groupBy("id").pivot("col").agg(count(col("col")))

df1.fillna(0).orderBy("id").show()

输出

+---+-------+-----+-----+
| id|goodbye|hello|world|
+---+-------+-----+-----+
|  1|      0|    1|    1|
|  2|      1|    0|    1|
|  3|      0|    2|    0|
+---+-------+-----+-----+

Based upon input data shared, I tried replicating your output and it works. Please see below -

Input Data

df = spark.createDataFrame(data=[(1, ['hello', 'world']), (2, ['goodbye', 'world']), (3, ['hello', 'hello'])], schema=['id', 'vals'])

df.show()

+---+----------------+
| id|            vals|
+---+----------------+
|  1|  [hello, world]|
|  2|[goodbye, world]|
|  3|  [hello, hello]|
+---+----------------+

Now, using explode to create separate rows out of vals list items. Thereafter, using pivot and count will calculate the frequency. Finally, replacing null values with 0 using fillna(0). See below -

from pyspark.sql.functions import *

df1 = df.select(['id', explode(col('vals'))]).groupBy("id").pivot("col").agg(count(col("col")))

df1.fillna(0).orderBy("id").show()

Output

+---+-------+-----+-----+
| id|goodbye|hello|world|
+---+-------+-----+-----+
|  1|      0|    1|    1|
|  2|      1|    0|    1|
|  3|      0|    2|    0|
+---+-------+-----+-----+
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文