根据条件即将到来的表单列表总结价值和dict

发布于 2025-02-13 00:10:00 字数 2368 浏览 2 评论 0原文

我在列表中有12个时期,这些列表并未根据所选产品具有更多。 另外,我有一个dict,其键和产品作为值列表。

{
    "20191": ["prod1","prod2","prod3"],
    "20192": ["prod2","prod3"],
    "20193": ["prod2"]
}

II需要根据周期选择数据并计算各个期间的总和。

sample_data

周期产品金额
20191pod130
20192pod130
20191prod220
20191prod360
20193pod1 pod130
20193prod230

输出

产品金额
20191pod1110
201920
20192 20193prod330

基本上,对于每个时期,从dict,仅选择这些时期总结一下。

我的代码需要大量时间:

list_series = []
df = spark.read.csv(path,header=True)
periods = df.select("period").distinct().collect()
for period in periods:
  df1 = df.filter(f"period = {period}").filter(F.col("product").isin(dict["period"]).groupBy("priod","product").agg(F.sum("Amount").alias("Amount")
  list_series.append(df1)
dataframe = reduce(DataFrame.unionAll,list_series)

有什么办法,我可以修改并提高性能?

I have min 12 periods in list, these are not fixed might have more based on the selected product.
Also, I have a dict which has period as key and products as list of values.

{
    "20191": ["prod1","prod2","prod3"],
    "20192": ["prod2","prod3"],
    "20193": ["prod2"]
}

II need to select the data based on period and compute the sum of the respective period, amount.

sample_data

periodproductamount
20191prod130
20192prod130
20191prod220
20191prod360
20193prod130
20193prod230

output

periodproductamount
20191prod1110
201920
20193prod330

Basically, for each of the period, select only those products from the dict, and sum it up.

My code which is taking lot of time:

list_series = []
df = spark.read.csv(path,header=True)
periods = df.select("period").distinct().collect()
for period in periods:
  df1 = df.filter(f"period = {period}").filter(F.col("product").isin(dict["period"]).groupBy("priod","product").agg(F.sum("Amount").alias("Amount")
  list_series.append(df1)
dataframe = reduce(DataFrame.unionAll,list_series)

Is there any way, I can modify and increase the performance?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

情独悲 2025-02-20 00:10:00

解决方案

将输入字典变义为单元列表,然后创建一个新的Spark DataFrame,称为efferters,然后JOIN与原始数据框一起使用列的原始 ocage> ocipers 和产品,然后groupby 期间和汇总金额使用sum

d = [(i, k) for k, v in dct.items() for i in v]
filters = spark.createDataFrame(d, schema=['product', 'period'])

(
    df
    .join(filters, on=['period', 'product'], how='right')
    .groupby('period')
    .agg(F.sum('amount').alias('amount'))
    .fillna(0)
)

结果

+------+------+
|period|amount|
+------+------+
| 20191|   110|
| 20192|     0|
| 20193|    30|
+------+------+

Solution

Flatten the input dictionary into list of tuples then create a new spark dataframe called filters, then join this dataframe with the original one by columns periods and product, then groupby period and aggregate amount using sum

d = [(i, k) for k, v in dct.items() for i in v]
filters = spark.createDataFrame(d, schema=['product', 'period'])

(
    df
    .join(filters, on=['period', 'product'], how='right')
    .groupby('period')
    .agg(F.sum('amount').alias('amount'))
    .fillna(0)
)

Result

+------+------+
|period|amount|
+------+------+
| 20191|   110|
| 20192|     0|
| 20193|    30|
+------+------+
如此安好 2025-02-20 00:10:00

通过以下输入:

df = spark.createDataFrame(
    [('20191', 'prod1', 30),
     ('20192', 'prod1', 30),
     ('20191', 'prod2', 20),
     ('20191', 'prod3', 60),
     ('20193', 'prod1', 30),
     ('20193', 'prod2', 30)],
    ['period', 'product', 'amount'])

periods = ["20191", "20192", "20193"]
period_products = {
    "20191": ["prod1","prod2","prod3"],
    "20192": ["prod2","prod3"],
    "20193": ["prod2"]
}

要使您的脚本更具性能,您将需要删除从一个人创建多个DF的步骤,然后将它们全部重新组合在一起。在一个数据框架中执行此操作而无需分开。

您可以在python中创建过滤器条件(在联接应添加性能提升之前过滤器),将其提供给过滤器功能,然后汇总。

conds = [f"((period = '{p}') and (product ='{prod}'))" for p in periods for prod in period_products[p]]
cond = ' or '.join(conds)

df_periods = spark.createDataFrame(
    [(p, i) for p in periods for i in period_products[p]],
    ['period', 'product']
)

df = (df_periods
    .join(df.filter(cond), ['period', 'product'], 'left')
    .groupBy('period', 'product')
    .agg(F.sum('amount').alias('amount'))
)

df.show()
# +------+-------+------+
# |period|product|amount|
# +------+-------+------+
# | 20191|  prod2|    20|
# | 20191|  prod1|    30|
# | 20191|  prod3|    60|
# | 20193|  prod2|    30|
# | 20192|  prod2|  null|
# | 20192|  prod3|  null|
# +------+-------+------+

With the following input:

df = spark.createDataFrame(
    [('20191', 'prod1', 30),
     ('20192', 'prod1', 30),
     ('20191', 'prod2', 20),
     ('20191', 'prod3', 60),
     ('20193', 'prod1', 30),
     ('20193', 'prod2', 30)],
    ['period', 'product', 'amount'])

periods = ["20191", "20192", "20193"]
period_products = {
    "20191": ["prod1","prod2","prod3"],
    "20192": ["prod2","prod3"],
    "20193": ["prod2"]
}

To make your script more performant, you will need to remove steps which create several dfs FROM ONE and then union them all back together. Do it in one dataframe without splitting.

You can create the filter condition in Python (a filter before the join should add performance boost), supply it to the filter function and then aggregate.

conds = [f"((period = '{p}') and (product ='{prod}'))" for p in periods for prod in period_products[p]]
cond = ' or '.join(conds)

df_periods = spark.createDataFrame(
    [(p, i) for p in periods for i in period_products[p]],
    ['period', 'product']
)

df = (df_periods
    .join(df.filter(cond), ['period', 'product'], 'left')
    .groupBy('period', 'product')
    .agg(F.sum('amount').alias('amount'))
)

df.show()
# +------+-------+------+
# |period|product|amount|
# +------+-------+------+
# | 20191|  prod2|    20|
# | 20191|  prod1|    30|
# | 20191|  prod3|    60|
# | 20193|  prod2|    30|
# | 20192|  prod2|  null|
# | 20192|  prod3|  null|
# +------+-------+------+
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文