- Introduction to Python
- Getting started with Python and the IPython notebook
- Functions are first class objects
- Data science is OSEMN
- Working with text
- Preprocessing text data
- Working with structured data
- Using SQLite3
- Using HDF5
- Using numpy
- Using Pandas
- Computational problems in statistics
- Computer numbers and mathematics
- Algorithmic complexity
- Linear Algebra and Linear Systems
- Linear Algebra and Matrix Decompositions
- Change of Basis
- Optimization and Non-linear Methods
- Practical Optimizatio Routines
- Finding roots
- Optimization Primer
- Using scipy.optimize
- Gradient deescent
- Newton’s method and variants
- Constrained optimization
- Curve fitting
- Finding paraemeters for ODE models
- Optimization of graph node placement
- Optimization of standard statistical models
- Fitting ODEs with the Levenberg–Marquardt algorithm
- 1D example
- 2D example
- Algorithms for Optimization and Root Finding for Multivariate Problems
- Expectation Maximizatio (EM) Algorithm
- Monte Carlo Methods
- Resampling methods
- Resampling
- Simulations
- Setting the random seed
- Sampling with and without replacement
- Calculation of Cook’s distance
- Permutation resampling
- Design of simulation experiments
- Example: Simulations to estimate power
- Check with R
- Estimating the CDF
- Estimating the PDF
- Kernel density estimation
- Multivariate kerndel density estimation
- Markov Chain Monte Carlo (MCMC)
- Using PyMC2
- Using PyMC3
- Using PyStan
- C Crash Course
- Code Optimization
- Using C code in Python
- Using functions from various compiled languages in Python
- Julia and Python
- Converting Python Code to C for speed
- Optimization bake-off
- Writing Parallel Code
- Massively parallel programming with GPUs
- Writing CUDA in C
- Distributed computing for Big Data
- Hadoop MapReduce on AWS EMR with mrjob
- Spark on a local mahcine using 4 nodes
- Modules and Packaging
- Tour of the Jupyter (IPython3) notebook
- Polyglot programming
- What you should know and learn more about
- Wrapping R libraries with Rpy
文章来源于网络收集而来,版权归原创者所有,如有侵权请及时联系!
Introduction to Spark concepts with a data manipulation example
Adapted from scala version in Chapter 2: Introduction to Data Analysis with Scala and Spark of Advanced Analytics with Spark (O’Reilly 2015)
import os if not os.path.exists('documentation'): ! curl -o documentation https://archive.ics.uci.edu/ml/machine-learning-databases/00210/documentation if not os.path.exists('donation.zip'): ! curl -o donation.zip https://archive.ics.uci.edu/ml/machine-learning-databases/00210/donation.zip ! unzip -n -q donation.zip ! unzip -n -q 'block_*.zip' if not os.path.exists('linkage'): ! mkdir linkage ! mv block_*.csv linkage ! rm block_*.zip
10 archives were successfully processed.
Info about the data set
Please see the documentation
file.
If we are running Spark on Hadoop, we need to transfer files to HDFS
! hadoop fs -mkdir linkage ! hadoop fs -put block_*.csv linkage
rdd = sc.textFile('linkage')
Actions trigger execution and return a non-RDD result
rdd.first()
u'"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"'
rdd.take(10)
[u'"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"', u'37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE', u'39086,47614,1,?,1,?,1,1,1,1,1,TRUE', u'70031,70237,1,?,1,?,1,1,1,1,1,TRUE', u'84795,97439,1,?,1,?,1,1,1,1,1,TRUE', u'36950,42116,1,?,1,1,1,1,1,1,1,TRUE', u'42413,48491,1,?,1,?,1,1,1,1,1,TRUE', u'25965,64753,1,?,1,?,1,1,1,1,1,TRUE', u'49451,90407,1,?,1,?,1,1,1,1,0,TRUE', u'39932,40902,1,?,1,?,1,1,1,1,1,TRUE']
def is_header(line): return "id_1" in line
Transforms return an RDD and are lazy
vals = rdd.filter(lambda x: not is_header(x)) vals
PythonRDD[4] at RDD at PythonRDD.scala:42
vals.count()
5749132
Now it is evaluated
vals.take(10)
[u'37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE', u'39086,47614,1,?,1,?,1,1,1,1,1,TRUE', u'70031,70237,1,?,1,?,1,1,1,1,1,TRUE', u'84795,97439,1,?,1,?,1,1,1,1,1,TRUE', u'36950,42116,1,?,1,1,1,1,1,1,1,TRUE', u'42413,48491,1,?,1,?,1,1,1,1,1,TRUE', u'25965,64753,1,?,1,?,1,1,1,1,1,TRUE', u'49451,90407,1,?,1,?,1,1,1,1,0,TRUE', u'39932,40902,1,?,1,?,1,1,1,1,1,TRUE', u'46626,47940,1,?,1,?,1,1,1,1,1,TRUE']
Each time we access vals, it is reconstructed from the original sources
Spark maintains a DAG of how each RDD was constructed so that data sets can be reconstructed - hence resilient distributed datasets. However, this is inefficient.
# vals is reconstructed again vals.first()
u'37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE'
Spark allows us to persist RDDs that we will be re-using
vals.cache()
PythonRDD[4] at RDD at PythonRDD.scala:42
# now vals is no longer reconstructed but retrieved from memory vals.take(10)
[u'37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE', u'39086,47614,1,?,1,?,1,1,1,1,1,TRUE', u'70031,70237,1,?,1,?,1,1,1,1,1,TRUE', u'84795,97439,1,?,1,?,1,1,1,1,1,TRUE', u'36950,42116,1,?,1,1,1,1,1,1,1,TRUE', u'42413,48491,1,?,1,?,1,1,1,1,1,TRUE', u'25965,64753,1,?,1,?,1,1,1,1,1,TRUE', u'49451,90407,1,?,1,?,1,1,1,1,0,TRUE', u'39932,40902,1,?,1,?,1,1,1,1,1,TRUE', u'46626,47940,1,?,1,?,1,1,1,1,1,TRUE']
vals.take(10)
[u'37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE', u'39086,47614,1,?,1,?,1,1,1,1,1,TRUE', u'70031,70237,1,?,1,?,1,1,1,1,1,TRUE', u'84795,97439,1,?,1,?,1,1,1,1,1,TRUE', u'36950,42116,1,?,1,1,1,1,1,1,1,TRUE', u'42413,48491,1,?,1,?,1,1,1,1,1,TRUE', u'25965,64753,1,?,1,?,1,1,1,1,1,TRUE', u'49451,90407,1,?,1,?,1,1,1,1,0,TRUE', u'39932,40902,1,?,1,?,1,1,1,1,1,TRUE', u'46626,47940,1,?,1,?,1,1,1,1,1,TRUE']
Parse lines and work on them
def parse(line): pieces = line.strip().split(',') id1, id2 = map(int, pieces[:2]) scores = [np.nan if p=='?' else float(p) for p in pieces[2:11]] matched = True if pieces[11] == 'TRUE' else False return [id1, id2, scores, matched]
mds = vals.map(lambda x: parse(x))
mds.cache()
PythonRDD[10] at RDD at PythonRDD.scala:42
match_counts = mds.map(lambda x: x[-1]).countByValue()
for cls in match_counts: print cls, match_counts[cls]
False 5728201 True 20931
Summary statistics
mds.map(lambda x: x[2][0]).stats()
(count: 5749132, mean: nan, stdev: nan, max: nan, min: nan)
mds.filter(lambda x: np.isfinite(x[2][0])).map(lambda x: x[2][0]).stats()
(count: 5748125, mean: 0.712902470443, stdev: 0.3887583258, max: 1.0, min: 0.0)
Takes too long on laptop - skip
stats = [mds.filter(lambda x: np.isfinite(x[2][i])).map(lambda x: x[2][i]).stats() for i in range(3)]
for stat in stats: print stat
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论