返回介绍

Introduction to Spark concepts with a data manipulation example

发布于 2025-02-25 23:44:06 字数 5134 浏览 0 评论 0 收藏 0

Adapted from scala version in Chapter 2: Introduction to Data Analysis with Scala and Spark of Advanced Analytics with Spark (O’Reilly 2015)

import os

if not os.path.exists('documentation'):
    ! curl -o documentation https://archive.ics.uci.edu/ml/machine-learning-databases/00210/documentation
if not os.path.exists('donation.zip'):
    ! curl -o donation.zip https://archive.ics.uci.edu/ml/machine-learning-databases/00210/donation.zip
! unzip -n -q donation.zip
! unzip -n -q 'block_*.zip'
if not os.path.exists('linkage'):
    ! mkdir linkage
! mv block_*.csv linkage
! rm block_*.zip
10 archives were successfully processed.

Info about the data set

Please see the documentation file.

If we are running Spark on Hadoop, we need to transfer files to HDFS

! hadoop fs -mkdir linkage
! hadoop fs -put block_*.csv linkage
rdd = sc.textFile('linkage')

Actions trigger execution and return a non-RDD result

rdd.first()
u'"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"'
rdd.take(10)
[u'"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"',
 u'37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE',
 u'39086,47614,1,?,1,?,1,1,1,1,1,TRUE',
 u'70031,70237,1,?,1,?,1,1,1,1,1,TRUE',
 u'84795,97439,1,?,1,?,1,1,1,1,1,TRUE',
 u'36950,42116,1,?,1,1,1,1,1,1,1,TRUE',
 u'42413,48491,1,?,1,?,1,1,1,1,1,TRUE',
 u'25965,64753,1,?,1,?,1,1,1,1,1,TRUE',
 u'49451,90407,1,?,1,?,1,1,1,1,0,TRUE',
 u'39932,40902,1,?,1,?,1,1,1,1,1,TRUE']
def is_header(line):
    return "id_1" in line

Transforms return an RDD and are lazy

vals = rdd.filter(lambda x: not is_header(x))
vals
PythonRDD[4] at RDD at PythonRDD.scala:42
vals.count()
5749132

Now it is evaluated

vals.take(10)
[u'37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE',
 u'39086,47614,1,?,1,?,1,1,1,1,1,TRUE',
 u'70031,70237,1,?,1,?,1,1,1,1,1,TRUE',
 u'84795,97439,1,?,1,?,1,1,1,1,1,TRUE',
 u'36950,42116,1,?,1,1,1,1,1,1,1,TRUE',
 u'42413,48491,1,?,1,?,1,1,1,1,1,TRUE',
 u'25965,64753,1,?,1,?,1,1,1,1,1,TRUE',
 u'49451,90407,1,?,1,?,1,1,1,1,0,TRUE',
 u'39932,40902,1,?,1,?,1,1,1,1,1,TRUE',
 u'46626,47940,1,?,1,?,1,1,1,1,1,TRUE']

Each time we access vals, it is reconstructed from the original sources

Spark maintains a DAG of how each RDD was constructed so that data sets can be reconstructed - hence resilient distributed datasets. However, this is inefficient.

# vals is reconstructed again
vals.first()
u'37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE'

Spark allows us to persist RDDs that we will be re-using

vals.cache()
PythonRDD[4] at RDD at PythonRDD.scala:42
# now vals is no longer reconstructed but retrieved from memory
vals.take(10)
[u'37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE',
 u'39086,47614,1,?,1,?,1,1,1,1,1,TRUE',
 u'70031,70237,1,?,1,?,1,1,1,1,1,TRUE',
 u'84795,97439,1,?,1,?,1,1,1,1,1,TRUE',
 u'36950,42116,1,?,1,1,1,1,1,1,1,TRUE',
 u'42413,48491,1,?,1,?,1,1,1,1,1,TRUE',
 u'25965,64753,1,?,1,?,1,1,1,1,1,TRUE',
 u'49451,90407,1,?,1,?,1,1,1,1,0,TRUE',
 u'39932,40902,1,?,1,?,1,1,1,1,1,TRUE',
 u'46626,47940,1,?,1,?,1,1,1,1,1,TRUE']
vals.take(10)
[u'37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE',
 u'39086,47614,1,?,1,?,1,1,1,1,1,TRUE',
 u'70031,70237,1,?,1,?,1,1,1,1,1,TRUE',
 u'84795,97439,1,?,1,?,1,1,1,1,1,TRUE',
 u'36950,42116,1,?,1,1,1,1,1,1,1,TRUE',
 u'42413,48491,1,?,1,?,1,1,1,1,1,TRUE',
 u'25965,64753,1,?,1,?,1,1,1,1,1,TRUE',
 u'49451,90407,1,?,1,?,1,1,1,1,0,TRUE',
 u'39932,40902,1,?,1,?,1,1,1,1,1,TRUE',
 u'46626,47940,1,?,1,?,1,1,1,1,1,TRUE']

Parse lines and work on them

def parse(line):
    pieces = line.strip().split(',')
    id1, id2 = map(int, pieces[:2])
    scores = [np.nan if p=='?' else float(p) for p in pieces[2:11]]
    matched = True if pieces[11] == 'TRUE' else False
    return [id1, id2, scores, matched]
mds = vals.map(lambda x: parse(x))
mds.cache()
PythonRDD[10] at RDD at PythonRDD.scala:42
match_counts = mds.map(lambda x: x[-1]).countByValue()
for cls in match_counts:
    print cls, match_counts[cls]
False 5728201
True 20931

Summary statistics

mds.map(lambda x: x[2][0]).stats()
(count: 5749132, mean: nan, stdev: nan, max: nan, min: nan)
mds.filter(lambda x: np.isfinite(x[2][0])).map(lambda x: x[2][0]).stats()
(count: 5748125, mean: 0.712902470443, stdev: 0.3887583258, max: 1.0, min: 0.0)

Takes too long on laptop - skip

stats = [mds.filter(lambda x: np.isfinite(x[2][i])).map(lambda x: x[2][i]).stats() for i in range(3)]

for stat in stats: print stat

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文