如何使用 for 循环和 case-when 条件修改 Pyspark 数据框?

发布于 2025-01-19 08:42:57 字数 1979 浏览 0 评论 0原文

我正在尝试修改火花数据框架,以便根据一个过滤值,我们将以几个条件为子集,并根据这些条件进行标记/修改变量。

理想情况下,我希望最终的数据框架与原始mod相同。

下面的示例:

data = [
    [1, "soda", "LB", 1, "L", 20],
    [2, "juice", "KG", 1, "GA", 12],
    [3, "water", "LB", 1, "L", 35],
    [4, "soda", "G", 1, "M2", 11],
]

df = pd.DataFrame(
    data, columns=["ID", "Beverage", "Weight", "Sample", "Volume", "Amount"]
)

drink_dictionary = {'soda': {"LB/L": 100, "G/M2": 200,},
"juice": {"KG/GA": 500, "LB/L": 90,},
                 "water": {'LB/L': 1,}}

sdf = spark.createDataFrame(df)

for drink in ["soda", "juice", "water"]:
    for mass_unit in drink_dictionary[drink].keys():
        weight_unit = mass_unit.split("/")[0]
        volume_unit = mass_unit.split("/")[1]
        value = drink_dictionary[drink][weight_unit + "/" + volume_unit]
        
        # create condition, i.e specific weight and volumes,
        # IF this condition is met, modify a different variable.
        # Loop to next condition under same Beverage type.
        # Modify accordingly.

        condition = (spark_fns.col("Weight") == weight_unit) & (
            spark_fns.col("Volume") == volume_unit
        )

        new_sdf = (
            sdf.filter(spark_fns.col("Beverage") == drink)
            .withColumn("Flag", spark_fns.when((condition), True).otherwise(False))
            .withColumn(
                "corrected_amount",
                spark_fns.when(
                    (condition),
                    spark_fns.expr(f"Amount / Sample * {value}"),
                ).otherwise(sdf["Amount"]),
            )
        )

其输出是不是正确的。理想情况下,我希望输出的外观在下面(这涉及在所有饮料中循环):

ID   Beverage  Weight  Sample  Volume  Amount   Corrected_Amount
1     soda       LB       1       L      20            200
2     juice      KG       1       GA     12            6000
3     water      LB       1       L      35            35
4     soda       G        1       M2     11            2200 

I am trying to modify a Spark dataframe such that depending on one filtered value, we subset to several conditions and flag/modify the variable based on those conditions.

Ideally, I would like the finalized dataframe to be the same size as the original, just with the necessary mods.

Example below:

data = [
    [1, "soda", "LB", 1, "L", 20],
    [2, "juice", "KG", 1, "GA", 12],
    [3, "water", "LB", 1, "L", 35],
    [4, "soda", "G", 1, "M2", 11],
]

df = pd.DataFrame(
    data, columns=["ID", "Beverage", "Weight", "Sample", "Volume", "Amount"]
)

drink_dictionary = {'soda': {"LB/L": 100, "G/M2": 200,},
"juice": {"KG/GA": 500, "LB/L": 90,},
                 "water": {'LB/L': 1,}}

sdf = spark.createDataFrame(df)

for drink in ["soda", "juice", "water"]:
    for mass_unit in drink_dictionary[drink].keys():
        weight_unit = mass_unit.split("/")[0]
        volume_unit = mass_unit.split("/")[1]
        value = drink_dictionary[drink][weight_unit + "/" + volume_unit]
        
        # create condition, i.e specific weight and volumes,
        # IF this condition is met, modify a different variable.
        # Loop to next condition under same Beverage type.
        # Modify accordingly.

        condition = (spark_fns.col("Weight") == weight_unit) & (
            spark_fns.col("Volume") == volume_unit
        )

        new_sdf = (
            sdf.filter(spark_fns.col("Beverage") == drink)
            .withColumn("Flag", spark_fns.when((condition), True).otherwise(False))
            .withColumn(
                "corrected_amount",
                spark_fns.when(
                    (condition),
                    spark_fns.expr(f"Amount / Sample * {value}"),
                ).otherwise(sdf["Amount"]),
            )
        )

The output of this is not correct. What I would ideally like the output to look like is below (this would involve looping through all beverages):

ID   Beverage  Weight  Sample  Volume  Amount   Corrected_Amount
1     soda       LB       1       L      20            200
2     juice      KG       1       GA     12            6000
3     water      LB       1       L      35            35
4     soda       G        1       M2     11            2200 

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

不离久伴 2025-01-26 08:42:57

这是解决方案的一种方法。我将Drink_dictionary更改为DataFrame,以便它可以与DataFrame中的其他信息一起使用。

from pyspark.sql import functions as F
df_drink_unit = (spark.createDataFrame([drink_dictionary], 
                                      schema=MapType(StringType(), MapType(StringType(), StringType())))
                 .select(F.explode('value').alias('Beverage', 'unit_list')))

这将创建数据框

+--------+--------------------+
|Beverage|           unit_list|
+--------+--------------------+
|   juice|[LB/L -> 90, KG/G...|
|    soda|[LB/L -> 100, G/M...|
|   water|         [LB/L -> 1]|
+--------+--------------------+

,然后将其加入主要数据框架,使用getItemunit_list 中找到相应的值,然后计算corrected_amount

sdf = (sdf.join(df_drink_unit, on='Beverage', how='left')
      .withColumn('unit', F.concat(F.col('Weight'), F.lit('/'), F.col('Volume')))
      .withColumn('unit_value', F.col('unit_list').getItem(F.col('unit')))
      .withColumn('Corrected_Amount', F.col('Amount') / F.col('Sample') * F.col('unit_value')))
sdf.select('Beverage', 'ID', 'Corrected_Amount').show()

+--------+---+----------------+
|Beverage| ID|Corrected_Amount|
+--------+---+----------------+
|   juice|  2|          6000.0|
|   water|  3|            35.0|
|    soda|  1|          2000.0|
|    soda|  4|          2200.0|
+--------+---+----------------+

This is one way to solve. I changed the drink_dictionary to dataframe so that it can work with the other information in dataframe.

from pyspark.sql import functions as F
df_drink_unit = (spark.createDataFrame([drink_dictionary], 
                                      schema=MapType(StringType(), MapType(StringType(), StringType())))
                 .select(F.explode('value').alias('Beverage', 'unit_list')))

This will create dataframe as

+--------+--------------------+
|Beverage|           unit_list|
+--------+--------------------+
|   juice|[LB/L -> 90, KG/G...|
|    soda|[LB/L -> 100, G/M...|
|   water|         [LB/L -> 1]|
+--------+--------------------+

Then, join this to the main dataframe, use getItem to find the corresponding value from unit_list and calculate the Corrected_Amount.

sdf = (sdf.join(df_drink_unit, on='Beverage', how='left')
      .withColumn('unit', F.concat(F.col('Weight'), F.lit('/'), F.col('Volume')))
      .withColumn('unit_value', F.col('unit_list').getItem(F.col('unit')))
      .withColumn('Corrected_Amount', F.col('Amount') / F.col('Sample') * F.col('unit_value')))
sdf.select('Beverage', 'ID', 'Corrected_Amount').show()

+--------+---+----------------+
|Beverage| ID|Corrected_Amount|
+--------+---+----------------+
|   juice|  2|          6000.0|
|   water|  3|            35.0|
|    soda|  1|          2000.0|
|    soda|  4|          2200.0|
+--------+---+----------------+
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文