执行组聚合以填充 RDD 中的字段值

发布于 2025-01-12 14:23:20 字数 3055 浏览 0 评论 0原文

我必须编写一段代码,第一步将文本文件发送到一个 RDD,然后根据 VIN 填充空单元格。

CSV 文件是:

 '1,I,VXIO456XLBB630221,Nissan,Altima,2003,2002-05-08,Initial sales from TechMotors',
 '2,I,INU45KIOOPA343980,Mercedes,C300,2015,2014-01-01,Sold from EuroMotors',
 '3,A,VXIO456XLBB630221,,,,2014-07-02,Head on collision',
 '4,R,VXIO456XLBB630221,,,,2014-08-05,Repair transmission',
 '5,I,VOME254OOXW344325,Mercedes,E350,2015,2014-02-01,Sold from Carmax',
 '6,R,VOME254OOXW344325,,,,2015-02-06,Wheel alignment service',
 '7,R,VXIO456XLBB630221,,,,2015-01-01,Replace right head light',
 '8,I,EXOA00341AB123456,Mercedes,SL550,2016,2015-01-01,Sold from AceCars',
 '9,A,VOME254OOXW344325,,,,2015-10-01,Side collision',
 '10,R,VOME254OOXW344325,,,,2015-09-01,Changed tires',
 '11,R,EXOA00341AB123456,,,,2015-05-01,Repair engine',
 '12,A,EXOA00341AB123456,,,,2015-05-03,Vehicle rollover',
 '13,R,VOME254OOXW344325,,,,2015-09-01,Replace passenger side door',
 '14,I,UXIA769ABCC447906,Toyota,Camery,2017,2016-05-08,Initial sales from Carmax',
 '15,R,UXIA769ABCC447906,,,,2020-01-02,Initial sales from Carmax',
 '16,A,INU45KIOOPA343980,,,,2020-05-01,Side collision'

我已经写了:

def extract_vin_key_value(line):
    sr=line.split(',')
    return (sr[2]),(sr[3],sr[5]),
        
raw_rdd = sc.textFile('/FileStore/tables/data.csv')
vin_kv = raw_rdd.map(lambda x: extract_vin_key_value(x))
vin_kv.collect()

第一部分的结果如下:

[('VXIO456XLBB630221', ('Nissan', '2003')),
 ('INU45KIOOPA343980', ('Mercedes', '2015')),
 ('VXIO456XLBB630221', ('', '')),
 ('VXIO456XLBB630221', ('', '')),
 ('VOME254OOXW344325', ('Mercedes', '2015')),
 ('VOME254OOXW344325', ('', '')),
 ('VXIO456XLBB630221', ('', '')),
 ('EXOA00341AB123456', ('Mercedes', '2016')),
 ('VOME254OOXW344325', ('', '')),
 ('VOME254OOXW344325', ('', '')),
 ('EXOA00341AB123456', ('', '')),
 ('EXOA00341AB123456', ('', '')),
 ('VOME254OOXW344325', ('', '')),
 ('UXIA769ABCC447906', ('Toyota', '2017')),
 ('UXIA769ABCC447906', ('', '')),
 ('INU45KIOOPA343980', ('', ''))]

现在,我必须通过开发一种方法来填充模型并基于 VIN 进行制作:

def populate_make(line):
    vin=line[0]
    ...
    ...

enhance_make = vin_kv.groupByKey().flatMap(lambda kv: populate_make(kv[1]))
enhance_make.collect()

我刚刚收到以下错误消息:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed
1 times, most recent failure: Lost task 0.0 in stage 11.0 (TID 20) (ip-10-172-237-160.us-west-
2.compute.internal executor driver): org.apache.spark.api.python.PythonException: 'TypeError:
'ResultIterable' object is not subscriptable

如何将数据拆分为第二个函数,我使用 split 方法,但我收到:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 13.0 failed
1 times, most recent failure: Lost task 1.0 in stage 13.0 (TID 25) (ip-10-172-237-160.us-west-
2.compute.internal executor driver): org.apache.spark.api.python.PythonException: 
'AttributeError: 'ResultIterable' object has no attribute 'split'

I have to write a code that at first step send a text file to one RDD, then based on VIN populate the null cells.

The CSV file is:

 '1,I,VXIO456XLBB630221,Nissan,Altima,2003,2002-05-08,Initial sales from TechMotors',
 '2,I,INU45KIOOPA343980,Mercedes,C300,2015,2014-01-01,Sold from EuroMotors',
 '3,A,VXIO456XLBB630221,,,,2014-07-02,Head on collision',
 '4,R,VXIO456XLBB630221,,,,2014-08-05,Repair transmission',
 '5,I,VOME254OOXW344325,Mercedes,E350,2015,2014-02-01,Sold from Carmax',
 '6,R,VOME254OOXW344325,,,,2015-02-06,Wheel alignment service',
 '7,R,VXIO456XLBB630221,,,,2015-01-01,Replace right head light',
 '8,I,EXOA00341AB123456,Mercedes,SL550,2016,2015-01-01,Sold from AceCars',
 '9,A,VOME254OOXW344325,,,,2015-10-01,Side collision',
 '10,R,VOME254OOXW344325,,,,2015-09-01,Changed tires',
 '11,R,EXOA00341AB123456,,,,2015-05-01,Repair engine',
 '12,A,EXOA00341AB123456,,,,2015-05-03,Vehicle rollover',
 '13,R,VOME254OOXW344325,,,,2015-09-01,Replace passenger side door',
 '14,I,UXIA769ABCC447906,Toyota,Camery,2017,2016-05-08,Initial sales from Carmax',
 '15,R,UXIA769ABCC447906,,,,2020-01-02,Initial sales from Carmax',
 '16,A,INU45KIOOPA343980,,,,2020-05-01,Side collision'

I have written:

def extract_vin_key_value(line):
    sr=line.split(',')
    return (sr[2]),(sr[3],sr[5]),
        
raw_rdd = sc.textFile('/FileStore/tables/data.csv')
vin_kv = raw_rdd.map(lambda x: extract_vin_key_value(x))
vin_kv.collect()

The result of fist part is as:

[('VXIO456XLBB630221', ('Nissan', '2003')),
 ('INU45KIOOPA343980', ('Mercedes', '2015')),
 ('VXIO456XLBB630221', ('', '')),
 ('VXIO456XLBB630221', ('', '')),
 ('VOME254OOXW344325', ('Mercedes', '2015')),
 ('VOME254OOXW344325', ('', '')),
 ('VXIO456XLBB630221', ('', '')),
 ('EXOA00341AB123456', ('Mercedes', '2016')),
 ('VOME254OOXW344325', ('', '')),
 ('VOME254OOXW344325', ('', '')),
 ('EXOA00341AB123456', ('', '')),
 ('EXOA00341AB123456', ('', '')),
 ('VOME254OOXW344325', ('', '')),
 ('UXIA769ABCC447906', ('Toyota', '2017')),
 ('UXIA769ABCC447906', ('', '')),
 ('INU45KIOOPA343980', ('', ''))]

Now, I have to populate the model and make based on VIN by developing a method as:

def populate_make(line):
    vin=line[0]
    ...
    ...

enhance_make = vin_kv.groupByKey().flatMap(lambda kv: populate_make(kv[1]))
enhance_make.collect()

I just receive following error message:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed
1 times, most recent failure: Lost task 0.0 in stage 11.0 (TID 20) (ip-10-172-237-160.us-west-
2.compute.internal executor driver): org.apache.spark.api.python.PythonException: 'TypeError:
'ResultIterable' object is not subscriptable

How can I split the data in second function, I use the split method but I receive:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 13.0 failed
1 times, most recent failure: Lost task 1.0 in stage 13.0 (TID 25) (ip-10-172-237-160.us-west-
2.compute.internal executor driver): org.apache.spark.api.python.PythonException: 
'AttributeError: 'ResultIterable' object has no attribute 'split'

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

扛起拖把扫天下 2025-01-19 14:23:20

groupByKey 返回类型对(Any, ResultIterable)。在 docstring 中,原因简要解释为

特殊的可迭代结果。使用此方法是因为标准迭代器无法被 pickle

我还没有明白它的含义,但我会研究它。顺便说一句,解决方案是将 ResultIterable 转换为列表:

enhance_make = vin_kv.groupByKey().flatMap(lambda kv: populate_make(list(kv[1])))

groupByKey returns pairs of type (Any, ResultIterable). In the docstring the reason is briefly explained as

A special result iterable. This is used because the standard iterator can not be pickled

I have not got the meaning of this yet, but I will look into it. By the way, the solution is to convert ResultIterable to a list:

enhance_make = vin_kv.groupByKey().flatMap(lambda kv: populate_make(list(kv[1])))
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文