如何在 pyspark 中对包含数据帧的数组的行进行相交

发布于 2025-01-15 00:01:05 字数 3830 浏览 4 评论 0原文

我有一个数据框,

   df = spark.createDataFrame(
    [(2022, 1, 3, '01', ['apple', 'banana', 'orange'],
      [['apple', 'edible', 'fruit', 'green'], ['largest', 'herbaceous', 'flowering', 'plant', 'Vitamin B', 'fruit'],
       ['source', 'Vitamin C', 'fruit']], [['fruit', 2], ['Vitamin', 2]]),
     (2022, 1, 3, '02', ['apple', 'banana', 'avocado'],
     [['apple', 'edible', 'fruit', 'green'], ['largest', 'herbaceous', 'flowering', 'plant', 'Vitamin B', 'fruit'],
      ['medium', 'dark', 'green', 'fruit']], [['fruit', 3], ['green', 2]]),
     (2022, 2, 4, '03', ['pomelo', 'fig'],
     [['citrus', 'fruit', 'sweet'], ['soft', 'sweet']], [['sweet', 2]]), ],
    ['year', 'month', 'day', 'id', "list_of_fruits",
        'collected_tokens', 'most_common_word']
)

+----+-----+---+---+------------------------+------------------------------------------------------------------------------------------------------------------------+--------------------------+
|year|month|day|id |list_of_fruits          |collected_tokens                                                                                                        |most_common_word          |
+----+-----+---+---+------------------------+------------------------------------------------------------------------------------------------------------------------+--------------------------+
|2022|1    |3  |01 |[apple, banana, orange] |[[apple, edible, fruit, green], [largest, herbaceous, flowering, plant, Vitamin B, fruit], [source, Vitamin C, fruit]]  |[[fruit, 2], [Vitamin, 2]]|
|2022|1    |3  |02 |[apple, banana, avocado]|[[apple, edible, fruit, green], [largest, herbaceous, flowering, plant, Vitamin B, fruit], [medium, dark, green, fruit]]|[[fruit, 3], [green, 2]]  |
|2022|2    |4  |03 |[pomelo, fig]           |[[citrus, fruit, sweet], [soft, sweet]]                                                                                 |[[sweet, 2]]              |
+----+-----+---+---+------------------------+------------------------------------------------------------------------------------------------------------------------+--------------------------

我想按年、日和月进行分组,并与包含列表、列表列表和带有键和最小值的列表(分别是最后三列)的行相交。最后,我想要这个结果

+----+-----+---+---+---------------------------+------------------------------------------------------------------------------------------+-----------------------------+
|year|month|day|id |intersection_list_of_fruits|intersection_collected_tokens                                                             |intersection_most_common_word|
+----+-----+---+---+---------------------------+------------------------------------------------------------------------------------------+-----------------------------+
|2022|1    |3  |01 |[apple, banana]            |[[apple, edible, fruit, green], [largest, herbaceous, flowering, plant, Vitamin B, fruit]]|[[fruit, 2]]                 |
|2022|1    |3  |02 |[apple, banana]            |[[apple, edible, fruit, green], [largest, herbaceous, flowering, plant, Vitamin B, fruit]]|[[fruit, 2]]                 |
|2022|2    |4  |03 |[pomelo, fig]              |[[citrus, fruit, sweet], [soft, sweet]]                                                   |[[sweet, 2]]                 |
+----+-----+---+---+---------------------------+------------------------------------------------------------------------------------------+-----------------------------+

所以在intersection_list_of_fruits列中缺少[orange],[avocado],在intersection_collected_tokens列中缺少<代码>[来源、维生素 C、水果]、[中等、深色、绿色、水果] 且在 intersection_most_common_word 列中缺失[维生素,2],[绿色,2]

我了解 array_intersect,但我需要查看按行交集,并且还需要使用聚合函数,因为 groupby - 将具有相同日期和时间的 id 分组使它们相交。 (我认为这可以使用spark的applyInPandas函数来完成)

I have a dataframe

   df = spark.createDataFrame(
    [(2022, 1, 3, '01', ['apple', 'banana', 'orange'],
      [['apple', 'edible', 'fruit', 'green'], ['largest', 'herbaceous', 'flowering', 'plant', 'Vitamin B', 'fruit'],
       ['source', 'Vitamin C', 'fruit']], [['fruit', 2], ['Vitamin', 2]]),
     (2022, 1, 3, '02', ['apple', 'banana', 'avocado'],
     [['apple', 'edible', 'fruit', 'green'], ['largest', 'herbaceous', 'flowering', 'plant', 'Vitamin B', 'fruit'],
      ['medium', 'dark', 'green', 'fruit']], [['fruit', 3], ['green', 2]]),
     (2022, 2, 4, '03', ['pomelo', 'fig'],
     [['citrus', 'fruit', 'sweet'], ['soft', 'sweet']], [['sweet', 2]]), ],
    ['year', 'month', 'day', 'id', "list_of_fruits",
        'collected_tokens', 'most_common_word']
)

+----+-----+---+---+------------------------+------------------------------------------------------------------------------------------------------------------------+--------------------------+
|year|month|day|id |list_of_fruits          |collected_tokens                                                                                                        |most_common_word          |
+----+-----+---+---+------------------------+------------------------------------------------------------------------------------------------------------------------+--------------------------+
|2022|1    |3  |01 |[apple, banana, orange] |[[apple, edible, fruit, green], [largest, herbaceous, flowering, plant, Vitamin B, fruit], [source, Vitamin C, fruit]]  |[[fruit, 2], [Vitamin, 2]]|
|2022|1    |3  |02 |[apple, banana, avocado]|[[apple, edible, fruit, green], [largest, herbaceous, flowering, plant, Vitamin B, fruit], [medium, dark, green, fruit]]|[[fruit, 3], [green, 2]]  |
|2022|2    |4  |03 |[pomelo, fig]           |[[citrus, fruit, sweet], [soft, sweet]]                                                                                 |[[sweet, 2]]              |
+----+-----+---+---+------------------------+------------------------------------------------------------------------------------------------------------------------+--------------------------

I want to groupby by year, day, and month and intersect rows containing a list, a list of lists and a list with a key and min value (the last three columns respectively). In the end, I would like this result

+----+-----+---+---+---------------------------+------------------------------------------------------------------------------------------+-----------------------------+
|year|month|day|id |intersection_list_of_fruits|intersection_collected_tokens                                                             |intersection_most_common_word|
+----+-----+---+---+---------------------------+------------------------------------------------------------------------------------------+-----------------------------+
|2022|1    |3  |01 |[apple, banana]            |[[apple, edible, fruit, green], [largest, herbaceous, flowering, plant, Vitamin B, fruit]]|[[fruit, 2]]                 |
|2022|1    |3  |02 |[apple, banana]            |[[apple, edible, fruit, green], [largest, herbaceous, flowering, plant, Vitamin B, fruit]]|[[fruit, 2]]                 |
|2022|2    |4  |03 |[pomelo, fig]              |[[citrus, fruit, sweet], [soft, sweet]]                                                   |[[sweet, 2]]                 |
+----+-----+---+---+---------------------------+------------------------------------------------------------------------------------------+-----------------------------+

So in the column intersection_list_of_fruits missing [orange],[avocado], in the column intersection_collected_tokens missing [source, Vitamin C, fruit], [medium, dark, green, fruit] and in the column intersection_most_common_word missing [Vitamin, 2], [green, 2].

I know about array_intersect, but I need to look at the intersection by row, and also need to use an aggregation function due to groupby - to group ids with the same date and intersect them. (I think this can be done using spark's applyInPandas function)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

蝶…霜飞 2025-01-22 00:01:06

您可以使用aggregatearray_intersect以及collect_set来计算list_of_fruitscollected_tokens<的交集/code> 获取 intersection_list_of_fruitsintersection_collected_tokens

但是,由于 intersection_most_common_word 需要考虑单词的数量。为此,

  1. 查找不包括计数的单词的交集
  2. 迭代交集单词和 most_common_word 中的收集数组并找到最小计数
from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql import Column

df = spark.createDataFrame(
    [(2022, 1, 3, '01', ['apple', 'banana', 'orange'],
      [['apple', 'edible', 'fruit', 'green'], ['largest', 'herbaceous', 'flowering', 'plant', 'Vitamin B', 'fruit'],
       ['source', 'Vitamin C', 'fruit']], [['fruit', 2], ['Vitamin', 2]]),
     (2022, 1, 3, '02', ['apple', 'banana', 'avocado'],
     [['apple', 'edible', 'fruit', 'green'], ['largest', 'herbaceous', 'flowering', 'plant', 'Vitamin B', 'fruit'],
      ['medium', 'dark', 'green', 'fruit']], [['fruit', 3], ['green', 2]]),
     (2022, 2, 4, '03', ['pomelo', 'fig'],
     [['citrus', 'fruit', 'sweet'], ['soft', 'sweet']], [['sweet', 2]]), ],
    ['year', 'month', 'day', 'id', "list_of_fruits",
        'collected_tokens', 'most_common_word']
)

def intersection_expr(col_name: str, window_spec: W) -> Column:
    lists = F.collect_set(col_name).over(window_spec)
    return F.aggregate(lists, lists[0], lambda acc,x: F.array_intersect(acc, x))



def intersect_min(col_name: str, window_spec: W) -> Column:
    # Convert array into map of word and count and collect into set
    k = F.transform(F.col(col_name), lambda x: x[0])
    v = F.transform(F.col(col_name), lambda x: x[1])
    map_count = F.map_from_arrays(k, v)
    map_counts = F.collect_list(map_count).over(window_spec)
    
    # Find keys present in all list
    keys = F.transform(map_counts, lambda x: F.map_keys(x))
    intersected = F.aggregate(keys, keys[0], lambda acc,x: F.array_intersect(acc, x))
    
    # For intersection find the minimum value
    res = F.transform(intersected, lambda key: F.array(key, F.array_min(F.transform(map_counts, lambda m: m.getField(key)))))
    
    return res

window_spec = W.partitionBy("year", "month", "day").orderBy("id").rowsBetween(W.unboundedPreceding, W.unboundedFollowing)

(df.select("year", "month", "day", "id",
        intersection_expr("list_of_fruits", window_spec).alias("intersection_list_of_fruits"), 
        intersection_expr("collected_tokens", window_spec).alias("intersection_collected_tokens"),
        intersect_min("most_common_word", window_spec).alias("intersection_most_common_word"))
    .show(truncate=False))


"""
+----+-----+---+---+---------------------------+------------------------------------------------------------------------------------------+-----------------------------+
|year|month|day|id |intersection_list_of_fruits|intersection_collected_tokens                                                             |intersection_most_common_word|
+----+-----+---+---+---------------------------+------------------------------------------------------------------------------------------+-----------------------------+
|2022|1    |3  |01 |[apple, banana]            |[[apple, edible, fruit, green], [largest, herbaceous, flowering, plant, Vitamin B, fruit]]|[[fruit, 2]]                 |
|2022|1    |3  |02 |[apple, banana]            |[[apple, edible, fruit, green], [largest, herbaceous, flowering, plant, Vitamin B, fruit]]|[[fruit, 2]]                 |
|2022|2    |4  |03 |[pomelo, fig]              |[[citrus, fruit, sweet], [soft, sweet]]                                                   |[[sweet, 2]]                 |
+----+-----+---+---+---------------------------+------------------------------------------------------------------------------------------+-----------------------------+
"""

You can use aggregate and array_intersect, along with collect_set to compute the intersection on list_of_fruits and collected_tokens to obtain intersection_list_of_fruits and intersection_collected_tokens.

However, since intersection_most_common_word needs to account for the count of the words. To do this,

  1. Find the intersections of words excluding counts
  2. Iterate over the intersection words and the collect arrays in most_common_word and find the minimum count
from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql import Column

df = spark.createDataFrame(
    [(2022, 1, 3, '01', ['apple', 'banana', 'orange'],
      [['apple', 'edible', 'fruit', 'green'], ['largest', 'herbaceous', 'flowering', 'plant', 'Vitamin B', 'fruit'],
       ['source', 'Vitamin C', 'fruit']], [['fruit', 2], ['Vitamin', 2]]),
     (2022, 1, 3, '02', ['apple', 'banana', 'avocado'],
     [['apple', 'edible', 'fruit', 'green'], ['largest', 'herbaceous', 'flowering', 'plant', 'Vitamin B', 'fruit'],
      ['medium', 'dark', 'green', 'fruit']], [['fruit', 3], ['green', 2]]),
     (2022, 2, 4, '03', ['pomelo', 'fig'],
     [['citrus', 'fruit', 'sweet'], ['soft', 'sweet']], [['sweet', 2]]), ],
    ['year', 'month', 'day', 'id', "list_of_fruits",
        'collected_tokens', 'most_common_word']
)

def intersection_expr(col_name: str, window_spec: W) -> Column:
    lists = F.collect_set(col_name).over(window_spec)
    return F.aggregate(lists, lists[0], lambda acc,x: F.array_intersect(acc, x))



def intersect_min(col_name: str, window_spec: W) -> Column:
    # Convert array into map of word and count and collect into set
    k = F.transform(F.col(col_name), lambda x: x[0])
    v = F.transform(F.col(col_name), lambda x: x[1])
    map_count = F.map_from_arrays(k, v)
    map_counts = F.collect_list(map_count).over(window_spec)
    
    # Find keys present in all list
    keys = F.transform(map_counts, lambda x: F.map_keys(x))
    intersected = F.aggregate(keys, keys[0], lambda acc,x: F.array_intersect(acc, x))
    
    # For intersection find the minimum value
    res = F.transform(intersected, lambda key: F.array(key, F.array_min(F.transform(map_counts, lambda m: m.getField(key)))))
    
    return res

window_spec = W.partitionBy("year", "month", "day").orderBy("id").rowsBetween(W.unboundedPreceding, W.unboundedFollowing)

(df.select("year", "month", "day", "id",
        intersection_expr("list_of_fruits", window_spec).alias("intersection_list_of_fruits"), 
        intersection_expr("collected_tokens", window_spec).alias("intersection_collected_tokens"),
        intersect_min("most_common_word", window_spec).alias("intersection_most_common_word"))
    .show(truncate=False))


"""
+----+-----+---+---+---------------------------+------------------------------------------------------------------------------------------+-----------------------------+
|year|month|day|id |intersection_list_of_fruits|intersection_collected_tokens                                                             |intersection_most_common_word|
+----+-----+---+---+---------------------------+------------------------------------------------------------------------------------------+-----------------------------+
|2022|1    |3  |01 |[apple, banana]            |[[apple, edible, fruit, green], [largest, herbaceous, flowering, plant, Vitamin B, fruit]]|[[fruit, 2]]                 |
|2022|1    |3  |02 |[apple, banana]            |[[apple, edible, fruit, green], [largest, herbaceous, flowering, plant, Vitamin B, fruit]]|[[fruit, 2]]                 |
|2022|2    |4  |03 |[pomelo, fig]              |[[citrus, fruit, sweet], [soft, sweet]]                                                   |[[sweet, 2]]                 |
+----+-----+---+---+---------------------------+------------------------------------------------------------------------------------------+-----------------------------+
"""
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文