如何使用 for 循环和 case-when 条件修改 Pyspark 数据框?
我正在尝试修改火花数据框架,以便根据一个过滤值,我们将以几个条件为子集,并根据这些条件进行标记/修改变量。
理想情况下,我希望最终的数据框架与原始mod相同。
下面的示例:
data = [
[1, "soda", "LB", 1, "L", 20],
[2, "juice", "KG", 1, "GA", 12],
[3, "water", "LB", 1, "L", 35],
[4, "soda", "G", 1, "M2", 11],
]
df = pd.DataFrame(
data, columns=["ID", "Beverage", "Weight", "Sample", "Volume", "Amount"]
)
drink_dictionary = {'soda': {"LB/L": 100, "G/M2": 200,},
"juice": {"KG/GA": 500, "LB/L": 90,},
"water": {'LB/L': 1,}}
sdf = spark.createDataFrame(df)
for drink in ["soda", "juice", "water"]:
for mass_unit in drink_dictionary[drink].keys():
weight_unit = mass_unit.split("/")[0]
volume_unit = mass_unit.split("/")[1]
value = drink_dictionary[drink][weight_unit + "/" + volume_unit]
# create condition, i.e specific weight and volumes,
# IF this condition is met, modify a different variable.
# Loop to next condition under same Beverage type.
# Modify accordingly.
condition = (spark_fns.col("Weight") == weight_unit) & (
spark_fns.col("Volume") == volume_unit
)
new_sdf = (
sdf.filter(spark_fns.col("Beverage") == drink)
.withColumn("Flag", spark_fns.when((condition), True).otherwise(False))
.withColumn(
"corrected_amount",
spark_fns.when(
(condition),
spark_fns.expr(f"Amount / Sample * {value}"),
).otherwise(sdf["Amount"]),
)
)
其输出是不是正确的。理想情况下,我希望输出的外观在下面(这涉及在所有饮料中循环):
ID Beverage Weight Sample Volume Amount Corrected_Amount
1 soda LB 1 L 20 200
2 juice KG 1 GA 12 6000
3 water LB 1 L 35 35
4 soda G 1 M2 11 2200
I am trying to modify a Spark dataframe such that depending on one filtered value, we subset to several conditions and flag/modify the variable based on those conditions.
Ideally, I would like the finalized dataframe to be the same size as the original, just with the necessary mods.
Example below:
data = [
[1, "soda", "LB", 1, "L", 20],
[2, "juice", "KG", 1, "GA", 12],
[3, "water", "LB", 1, "L", 35],
[4, "soda", "G", 1, "M2", 11],
]
df = pd.DataFrame(
data, columns=["ID", "Beverage", "Weight", "Sample", "Volume", "Amount"]
)
drink_dictionary = {'soda': {"LB/L": 100, "G/M2": 200,},
"juice": {"KG/GA": 500, "LB/L": 90,},
"water": {'LB/L': 1,}}
sdf = spark.createDataFrame(df)
for drink in ["soda", "juice", "water"]:
for mass_unit in drink_dictionary[drink].keys():
weight_unit = mass_unit.split("/")[0]
volume_unit = mass_unit.split("/")[1]
value = drink_dictionary[drink][weight_unit + "/" + volume_unit]
# create condition, i.e specific weight and volumes,
# IF this condition is met, modify a different variable.
# Loop to next condition under same Beverage type.
# Modify accordingly.
condition = (spark_fns.col("Weight") == weight_unit) & (
spark_fns.col("Volume") == volume_unit
)
new_sdf = (
sdf.filter(spark_fns.col("Beverage") == drink)
.withColumn("Flag", spark_fns.when((condition), True).otherwise(False))
.withColumn(
"corrected_amount",
spark_fns.when(
(condition),
spark_fns.expr(f"Amount / Sample * {value}"),
).otherwise(sdf["Amount"]),
)
)
The output of this is not correct. What I would ideally like the output to look like is below (this would involve looping through all beverages):
ID Beverage Weight Sample Volume Amount Corrected_Amount
1 soda LB 1 L 20 200
2 juice KG 1 GA 12 6000
3 water LB 1 L 35 35
4 soda G 1 M2 11 2200
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
这是解决方案的一种方法。我将
Drink_dictionary
更改为DataFrame,以便它可以与DataFrame中的其他信息一起使用。这将创建数据框
,然后将其加入主要数据框架,使用
getItem
从unit_list
中找到相应的值,然后计算corrected_amount
。This is one way to solve. I changed the
drink_dictionary
to dataframe so that it can work with the other information in dataframe.This will create dataframe as
Then, join this to the main dataframe, use
getItem
to find the corresponding value fromunit_list
and calculate theCorrected_Amount
.