数组列中的唯一元素计数

发布于 2025-02-07 17:35:14 字数 1301 浏览 2 评论 0原文

我的数据集带有数组类型的列。在此列中,我们需要创建另一列,该列将包含唯一元素及其计数的列表。

示例[a,b,e,b]结果应为[[b,a,e],[2,1,1]]。数据应按数量进行排序。即使是值为数值的钥匙值也会做到。为此,我创建了一个udf(请参见下文),但是它非常慢,因此我需要在Pyspark内置功能中执行此操作。

IDcol_aCollected_col_a
1a[a,b,e,b]
1b[a,b,e,b]
struct_schema1 = StructType([
    StructField('elements', ArrayType(StringType()), nullable=True),
    StructField('count', ArrayType(IntegerType()), nullable=True)
])

# udf
@udf(returnType=struct_schema1)
def func1(x, top = 10):
    y,z=np.unique(x,return_counts=True)
    z_y = zip(z.tolist(), y.tolist())
    y = [i for _, i in sorted(z_y, reverse = True)]
    z = sorted(z.tolist(), reverse = True)
    if len(y) > top:
        return {'elements': y[:top],'count': z[:top]}
    else:
        return {'elements': y,'count': z}

I have this dataset with a column of array type. From this column, we need to create another column which will have list of unique elements and its counts.

Example [a,b,e,b] results should be [[b,a,e],[2,1,1]]. Data should be sorted by count. Even key value where value is the count will do. I created a udf (please see below) for this purpose, but it is very slow so I need to do this in PySpark built-in functions.

idcol_acollected_col_a
1a[a, b, e, b]
1b[a, b, e, b]
struct_schema1 = StructType([
    StructField('elements', ArrayType(StringType()), nullable=True),
    StructField('count', ArrayType(IntegerType()), nullable=True)
])

# udf
@udf(returnType=struct_schema1)
def func1(x, top = 10):
    y,z=np.unique(x,return_counts=True)
    z_y = zip(z.tolist(), y.tolist())
    y = [i for _, i in sorted(z_y, reverse = True)]
    z = sorted(z.tolist(), reverse = True)
    if len(y) > top:
        return {'elements': y[:top],'count': z[:top]}
    else:
        return {'elements': y,'count': z}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

阪姬 2025-02-14 17:35:14

您可以使用转换 filter array_distinctsize一起函数,以获取所需的输出。这是一个例子:

from pyspark.sql import functions as F

# example of input dataframe
df = spark.createDataFrame([(1, ["a", "b", "e", "b"]), (2, ["a", "a", "c", "b"])], ["id", "arrayCol"])


df1 = df.withColumn(
    "uniqueCount",
    F.transform(
        F.array_distinct("arrayCol"),
        lambda x: F.struct(
            x.alias("value"),
            F.size(F.filter("arrayCol", lambda y: x == y)).alias("cout")
        )
    )
)
df1.show(truncate=False)
#+---+------------+------------------------+
#|id |arrayCol    |uniqueCount             |
#+---+------------+------------------------+
#|1  |[a, b, e, b]|[{a, 1}, {b, 2}, {e, 1}]|
#|2  |[a, a, c, b]|[{a, 2}, {c, 1}, {b, 1}]|
#+---+------------+------------------------+

You can use combination of transform and filter functions along with array_distinct and size to get the desired output. Here's and example:

from pyspark.sql import functions as F

# example of input dataframe
df = spark.createDataFrame([(1, ["a", "b", "e", "b"]), (2, ["a", "a", "c", "b"])], ["id", "arrayCol"])


df1 = df.withColumn(
    "uniqueCount",
    F.transform(
        F.array_distinct("arrayCol"),
        lambda x: F.struct(
            x.alias("value"),
            F.size(F.filter("arrayCol", lambda y: x == y)).alias("cout")
        )
    )
)
df1.show(truncate=False)
#+---+------------+------------------------+
#|id |arrayCol    |uniqueCount             |
#+---+------------+------------------------+
#|1  |[a, b, e, b]|[{a, 1}, {b, 2}, {e, 1}]|
#|2  |[a, a, c, b]|[{a, 2}, {c, 1}, {b, 1}]|
#+---+------------+------------------------+
把人绕傻吧 2025-02-14 17:35:14

创建地图的方法。使用 centregate and

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [(1, 'a', ['a', 'b', 'e', 'b']),
     (1, 'b', ['a', 'b', 'e', 'b'])],
    ['id', 'col_a', 'collected_col_a']
)
df = df.withColumn('elem_count',
    F.aggregate(
        'collected_col_a',
        F.lit(None).cast('map<string,int>'),
        lambda m, x: F.map_zip_with(
            F.coalesce(m, F.create_map(x, F.lit(0))),
            F.create_map(x, F.lit(1)),
            lambda k, v1, v2: F.coalesce(v1, F.lit(0)) + F.coalesce(v2, F.lit(0))
        )
    )
)
df.show(truncate=0)
# +---+-----+---------------+------------------------+
# |id |col_a|collected_col_a|elem_count              |
# +---+-----+---------------+------------------------+
# |1  |a    |[a, b, e, b]   |{a -> 1, b -> 2, e -> 1}|
# |1  |b    |[a, b, e, b]   |{a -> 1, b -> 2, e -> 1}|
# +---+-----+---------------+------------------------+

抱歉,我无法弄清楚如何根据地图值进行排序。

An approach creating a map. Using aggregate and map_zip_with. The other approach seems clearer though.

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [(1, 'a', ['a', 'b', 'e', 'b']),
     (1, 'b', ['a', 'b', 'e', 'b'])],
    ['id', 'col_a', 'collected_col_a']
)
df = df.withColumn('elem_count',
    F.aggregate(
        'collected_col_a',
        F.lit(None).cast('map<string,int>'),
        lambda m, x: F.map_zip_with(
            F.coalesce(m, F.create_map(x, F.lit(0))),
            F.create_map(x, F.lit(1)),
            lambda k, v1, v2: F.coalesce(v1, F.lit(0)) + F.coalesce(v2, F.lit(0))
        )
    )
)
df.show(truncate=0)
# +---+-----+---------------+------------------------+
# |id |col_a|collected_col_a|elem_count              |
# +---+-----+---------------+------------------------+
# |1  |a    |[a, b, e, b]   |{a -> 1, b -> 2, e -> 1}|
# |1  |b    |[a, b, e, b]   |{a -> 1, b -> 2, e -> 1}|
# +---+-----+---------------+------------------------+

Sorry, I couldn't figure out how to sort based on map values.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文