我有一个示例火花DF如下:
df = ([[1, 'a', 'b' , 'c'],
[1, 'b', 'c' , 'b'],
[1, 'b', 'a' , 'b'],
[2, 'c', 'a' , 'a'],
[3, 'b', 'b' , 'a']]).toDF(['id', 'field1', 'field2', 'field3'])
下一步需要的是提供多个聚合,以显示每个字段的A,B,C值的摘要。我有一个工作,但乏味的过程如下:
agg_table = (
df
.groupBy('id')
.agg(
# field1
sum(when(col('field1') == 'a',1).otherwise(0)).alias('field1_a_count')
,sum(when(col('field1') == 'b',1).otherwise(0)).alias('field1_b_count')
,sum(when(col('field1') == 'c',1).otherwise(0)).alias('field1_c_count')
# field2
,sum(when(col('field2') == 'a',1).otherwise(0)).alias('field2_a_count')
,sum(when(col('field2') == 'b',1).otherwise(0)).alias('field2_b_count')
,sum(when(col('field2') == 'c',1).otherwise(0)).alias('field2_c_count')
# field3
,sum(when(col('field3') == 'a',1).otherwise(0)).alias('field3_a_count')
,sum(when(col('field3') == 'b',1).otherwise(0)).alias('field3_b_count')
,sum(when(col('field3') == 'c',1).otherwise(0)).alias('field3_c_count')
))
我希望得到的是:
agg_table = (['id':'1','2','3'],
['field1_a_count':1,0,0],
['field1_b_count':2,0,1],
['field1_c_count':0, 1, 0],
['field2_a_count':1,1,0],
['field2_b_count':1,0,1],
['field2_c_count':1,0,0],
['field3_a_count':0,1,1],
['field3_b_count':2,0,0],
['field3_c_count':1,0,0])
如果我只有3个字段,那很好,但是我有30个字段,具有不同的/自定义名称。也许有人可以帮助我完成编码每个字段总和的重复任务。我试图探讨以下建议:
我可以使它起作用
AnalysisException: cannot resolve '`value`' given input columns: ['field1','field2','field3']
如果我只能拉一个列和一个值, 尝试使用的是:
validated_cols = ['field1','field2','field3']
df.select(validated_cols).groupBy('id').agg(collect_list($'field1_a_count',$'field1_b_count',$'field1_c_count', ...
$'field30_c_count')).show()
Output: SyntaxError: invalid syntax
我也尝试使用枢轴,但是从搜索到目前为止,它说它仅适用于一列。我尝试了多个列:
df.withColumn("p", concat($"p1", $"p2"))
.groupBy("a", "b")
.pivot("p")
.agg(...)
我仍然会遇到语法错误。
我尝试过的另一个链接: https:// blog/2019/02/05/complect-aggregations in-pyspark/
我还尝试了exprs方法: exprs1 = {x:x in x in df.columns in x!='id in df.columns中'}
将不胜感激。谢谢
I have a sample spark df as below:
df = ([[1, 'a', 'b' , 'c'],
[1, 'b', 'c' , 'b'],
[1, 'b', 'a' , 'b'],
[2, 'c', 'a' , 'a'],
[3, 'b', 'b' , 'a']]).toDF(['id', 'field1', 'field2', 'field3'])
What I need next is to provide a multiple aggregations to show summary of the a, b, c values for each field. I have a working but tedious process as below:
agg_table = (
df
.groupBy('id')
.agg(
# field1
sum(when(col('field1') == 'a',1).otherwise(0)).alias('field1_a_count')
,sum(when(col('field1') == 'b',1).otherwise(0)).alias('field1_b_count')
,sum(when(col('field1') == 'c',1).otherwise(0)).alias('field1_c_count')
# field2
,sum(when(col('field2') == 'a',1).otherwise(0)).alias('field2_a_count')
,sum(when(col('field2') == 'b',1).otherwise(0)).alias('field2_b_count')
,sum(when(col('field2') == 'c',1).otherwise(0)).alias('field2_c_count')
# field3
,sum(when(col('field3') == 'a',1).otherwise(0)).alias('field3_a_count')
,sum(when(col('field3') == 'b',1).otherwise(0)).alias('field3_b_count')
,sum(when(col('field3') == 'c',1).otherwise(0)).alias('field3_c_count')
))
What I am expecting to get is this:
agg_table = (['id':'1','2','3'],
['field1_a_count':1,0,0],
['field1_b_count':2,0,1],
['field1_c_count':0, 1, 0],
['field2_a_count':1,1,0],
['field2_b_count':1,0,1],
['field2_c_count':1,0,0],
['field3_a_count':0,1,1],
['field3_b_count':2,0,0],
['field3_c_count':1,0,0])
It is just fine if I only really have 3 fields, but I have 30 fields with varying/custom names. Maybe somebody can help me with the repetitive task of coding the aggregated sum per field. I tried playing around with a suggestion from :
https://danvatterott.com/blog/2018/09/06/python-aggregate-udfs-in-pyspark/
I can make it work if I will only pull one column and one value, but I get varying errors, one of them is:
AnalysisException: cannot resolve '`value`' given input columns: ['field1','field2','field3']
One last line I tried is using:
validated_cols = ['field1','field2','field3']
df.select(validated_cols).groupBy('id').agg(collect_list(
I tried with pivot too, but from searches so far, it says it is only good for one column. I tried this multiple columns:
df.withColumn("p", concat(quot;p1", quot;p2"))
.groupBy("a", "b")
.pivot("p")
.agg(...)
I still get a syntax error.
Another link I tried: https://danvatterott.com/blog/2019/02/05/complex-aggregations-in-pyspark/
I also tried the exprs approach: exprs1 = {x: "sum" for x in df.columns if x != 'id'}
Any suggested will be appreciated. Thanks
field1_a_count',
I tried with pivot too, but from searches so far, it says it is only good for one column. I tried this multiple columns:
I still get a syntax error.
Another link I tried: https://danvatterott.com/blog/2019/02/05/complex-aggregations-in-pyspark/
I also tried the exprs approach: exprs1 = {x: "sum" for x in df.columns if x != 'id'}
Any suggested will be appreciated. Thanks
field1_b_count',
I tried with pivot too, but from searches so far, it says it is only good for one column. I tried this multiple columns:
I still get a syntax error.
Another link I tried: https://danvatterott.com/blog/2019/02/05/complex-aggregations-in-pyspark/
I also tried the exprs approach: exprs1 = {x: "sum" for x in df.columns if x != 'id'}
Any suggested will be appreciated. Thanks
field1_c_count', ...
I tried with pivot too, but from searches so far, it says it is only good for one column. I tried this multiple columns:
I still get a syntax error.
Another link I tried: https://danvatterott.com/blog/2019/02/05/complex-aggregations-in-pyspark/
I also tried the exprs approach: exprs1 = {x: "sum" for x in df.columns if x != 'id'}
Any suggested will be appreciated. Thanks
field30_c_count')).show()
Output: SyntaxError: invalid syntax
I tried with pivot too, but from searches so far, it says it is only good for one column. I tried this multiple columns:
I still get a syntax error.
Another link I tried: https://danvatterott.com/blog/2019/02/05/complex-aggregations-in-pyspark/
I also tried the exprs approach: exprs1 = {x: "sum" for x in df.columns if x != 'id'}
Any suggested will be appreciated. Thanks
发布评论
评论(2)
让我分两个步骤回答您的问题。首先,您想知道是否有可能避免硬编码所有聚合,以试图计算所有聚合。这是。我会这样这样做:
但这不是您期望的正确吗?您正在尝试在
ID
列上计算某种枢轴。为此,我不会使用先前的结果,而只是使用不同的数据来处理数据。首先,我将通过id
(将其更名为x
)替换ID
),从表单{column_name} _ { value} _count
,我会爆炸该数组。从那里开始,我们只需要在以前的id
列上计算一个简单的枢轴,重命名为x
,由爆炸数组中包含的值分组。哪个产生:
Let me answer your question in two steps. First, you are wondering if it is possible to avoid hard coding all your aggregations in your attempt to compute all your aggregations. It is. I would do it like this:
But that is not what you expect right? You are trying to compute some kind of pivot on the
id
column. To do this, I would not use the previous result, but just work the data differently. I would start by replacing all the columns of the dataframe butid
(that is renamed intox
) by an array of values of the form{column_name}_{value}_count
, and I would explode that array. From there, we just need to compute a simple pivot on the formerid
column renamedx
, grouped by the values contained in the exploded array.which yields:
根据评论中的讨论的更新
,我认为这个问题是 xy问题< /a>。手头的任务是在数据工程和ETL开发的世界中经常看到的东西:如何分区然后量化好和坏记录。
如果准备数据将数据加载到数据仓库 / Hadoop生态系统的情况下,通常的模式是获取原始输入并将其加载到数据范围内,然后应用转换&amp;将数据划分为“好,坏和丑陋”的验证:
目标不应是在每一列和每一行中为这三个分类中的每一个生成计数。试图做到这是适得其反的。为什么?因为当转换步骤或QA检查失败以获取给定记录时,应立即拒绝整个记录并将其发送到单独的输出流以稍后进行分析。数据集中的每一行应仅视为:一个记录。单个字段不可能失败,并且仍然拥有完整的记录通行证,这使该粒度不必要。您知道100行在“地址”字段上通过,您会采取什么行动?对于有效的记录,重要的是每列传递的总数。否则,这将不是有效的记录。
话虽如此,请记住,目标是构建可用且清洁的数据集;分析被拒绝的记录是次要任务,可以离线完成。
通常的做法是将字段添加到被拒绝的数据以指示哪个列导致故障。这使得对任何错误的数据进行故障排除变得容易,因此即使对于不良记录,也无需在所有列中生成计数。取而代之的是,只需在主要工作完成后查看被拒绝的数据,然后解决问题。继续迭代地进行此操作,直到被拒绝的记录的数量低于您认为合理的任何门槛,然后继续监视它。
旧答案
这是数据中设计缺陷的迹象。无论“ field1”,“ field2”等...列实际上代表了什么,看来它们都是相关的,从某种意义上说,值量化了某些属性(也许每个属性都是特定商品ID或数字的计数拥有一定财产的人...)。问题在于,这些字段在事实表 1 上添加为单个列,然后需要汇总,从而导致您面临的情况。
一个更好的设计是将那些“ field1”,“ field2”等折叠成一个单个代码字段,该字段可以用作 five在执行聚合时用作
组。如果现有的列有许多其他列,并且进行此更改会以可能导致其他问题的方式改变谷物,则可能需要考虑创建一个单独的表来执行此操作。
1:通常是一个带有一堆具有相同名称和目的的列表的表,这通常是一个大的危险信号。我什至看到了某人创建了带有“备用”列的表格的情况,以便他们以后想添加更多属性。不好。
update
based on discussion in the comments, I think this question is a case of an X-Y problem. The task at hand is something that is seen very frequently in the world of Data Engineering and ETL development: how to partition and then quantify good and bad records.
In the case where the data is being prepared to load to a data warehouse / hadoop ecosystem, the usual pattern is to take the raw input and load it to a dataframe, then apply transformations & validations that partition the data into "The Good, The Bad, and The Ugly":
The goal should not be to generate counts for each of these 3 classifications across every column and for every row. Trying to do that is counterproductive. Why? Because when a transformation step or QA check fails for a given record, that entire record should be rejected immediately and sent to a separate output stream to be analyzed later. Each row in the data set should be treated as just that: a single record. It isn't possible for a single field to fail and still have the complete record pass, which makes metrics at this granularity unnecessary. What action will you take knowing that 100 rows passed on the "address" field? For valid records, all that matters is the total number that passed for every column. Otherwise, it wouldn't be a valid record.
With that said, remember that the goal is to build a usable and cleansed data set; analyzing the rejected records is a secondary task and can be done offline.
It is common practice to add a field to the rejected data to indicated which column caused the failure. That makes it easy to troubleshoot any malformed data, so there is really no need to generate counts across all columns, even for bad records. Instead, just review the rejected data after the main job finishes, and address the problems. Continue doing that iteratively until the number of rejected records is below whatever threshold you think is reasonable, and then continue to monitor it going forward.
Old answer
This is a sign of a design flaw in the data. Whatever the "field1", "field2", etc... columns actually represent, it appears they are all related, in the sense that the values quantify some attribute (maybe each one is a count for a specific merchandise ID, or the number of people with a certain property...). The problem is that these fields are being added as individual columns on a fact table1, which then needs to be aggregated, resulting in the situation that you're facing.
A better design would be to collapse those "field1", "field2", etc... columns into a single code field that can be used as the
GROUP BY
field when doing the aggregation. You might want to consider creating a separate table to do this if the existing one has many other columns and making this change would alter the grain in a way that might cause other problems.1: it's usually a big red flag to have a table with a bunch of enumerated columns with the same name and purpose. I've even seen cases where someone has created tables with "spare" columns for when they want to add more attributes later. Not good.