- Introduction to Python
- Getting started with Python and the IPython notebook
- Functions are first class objects
- Data science is OSEMN
- Working with text
- Preprocessing text data
- Working with structured data
- Using SQLite3
- Using HDF5
- Using numpy
- Using Pandas
- Computational problems in statistics
- Computer numbers and mathematics
- Algorithmic complexity
- Linear Algebra and Linear Systems
- Linear Algebra and Matrix Decompositions
- Change of Basis
- Optimization and Non-linear Methods
- Practical Optimizatio Routines
- Finding roots
- Optimization Primer
- Using scipy.optimize
- Gradient deescent
- Newton’s method and variants
- Constrained optimization
- Curve fitting
- Finding paraemeters for ODE models
- Optimization of graph node placement
- Optimization of standard statistical models
- Fitting ODEs with the Levenberg–Marquardt algorithm
- 1D example
- 2D example
- Algorithms for Optimization and Root Finding for Multivariate Problems
- Expectation Maximizatio (EM) Algorithm
- Monte Carlo Methods
- Resampling methods
- Resampling
- Simulations
- Setting the random seed
- Sampling with and without replacement
- Calculation of Cook’s distance
- Permutation resampling
- Design of simulation experiments
- Example: Simulations to estimate power
- Check with R
- Estimating the CDF
- Estimating the PDF
- Kernel density estimation
- Multivariate kerndel density estimation
- Markov Chain Monte Carlo (MCMC)
- Using PyMC2
- Using PyMC3
- Using PyStan
- C Crash Course
- Code Optimization
- Using C code in Python
- Using functions from various compiled languages in Python
- Julia and Python
- Converting Python Code to C for speed
- Optimization bake-off
- Writing Parallel Code
- Massively parallel programming with GPUs
- Writing CUDA in C
- Distributed computing for Big Data
- Hadoop MapReduce on AWS EMR with mrjob
- Spark on a local mahcine using 4 nodes
- Modules and Packaging
- Tour of the Jupyter (IPython3) notebook
- Polyglot programming
- What you should know and learn more about
- Wrapping R libraries with Rpy
Using Hadoop MapReduce
We want to count the number of times each word occurs in a set of books. We will do this in Python.
start-dfs.sh start-yarn.sh
This will generate a lot of chatter
15/04/06 12:42:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Starting namenodes on [localhost] localhost: starting namenode, logging to /usr/local/Cellar/hadoop/2.6.0/libexec/logs/hadoop-cliburn-namenode-lister.dulci.duhs.duke.edu.out localhost: starting datanode, logging to /usr/local/Cellar/hadoop/2.6.0/libexec/logs/hadoop-cliburn-datanode-lister.dulci.duhs.duke.edu.out Starting secondary namenodes [0.0.0.0] 0.0.0.0: starting secondarynamenode, logging to /usr/local/Cellar/hadoop/2.6.0/libexec/logs/hadoop-cliburn-secondarynamenode-lister.dulci.duhs.duke.edu.out 15/04/06 12:42:30 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable starting yarn daemons starting resourcemanager, logging to /usr/local/Cellar/hadoop/2.6.0/libexec/logs/yarn-cliburn-resourcemanager-lister.dulci.duhs.duke.edu.out localhost: starting nodemanager, logging to /usr/local/Cellar/hadoop/2.6.0/libexec/logs/yarn-cliburn-nodemanager-lister.dulci.duhs.duke.edu.out
hdfs dfs -mkdir /user hdfs dfs -mkdir /user/cliburn
hadoop dfs -copyFromLocal books /user/cliburn/books
%%file mapper.py #!/usr/bin/env python import sys def read_input(file): for line in file: yield line.split() def main(sep='\t'): data = read_input(sys.stdin) for words in data: for word in words: print '%s%s%d' % (word, sep, 1) if __name__ == '__main__': main()
Writing mapper.py
%%file reducer.py #!/usr/bin/env python from itertools import groupby from operator import itemgetter import sys def read_mapper_output(file, sep): for line in file: yield line.rstrip().split(sep, 1) def main(sep='\t'): data = read_mapper_output(sys.stdin, sep=sep) for word, group in groupby(data, itemgetter(0)): total_count = sum(int(count) for word, count in group) print '%s%s%d' % (word, sep, total_count) if __name__ == '__main__': main()
Overwriting reducer.py
! chmod +x maper.py ! chmod +x reducer.py
The native language for Hadoop is Java, but Hadoop stremaing allows custom prograsm in other langauges to write the mapper, combiner and reducer functions. For full set of options, see http://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/HadoopStreaming.html
hadoop jar $HADOOP_HOME/libexec/share/hadoop/tools/lib/hadoop-*streaming*.jar \ -file ./mapper.py -mapper ./mapper.py \ -file ./reducer.py -reducer ./reducer.py \ -input /user/cliburn/books/* -output /user/cliburn/books-output
hdfs dfs -ls /user/cliburn/books-output hdfs dfs -cat /user/cliburn/books-output/part-00000
./sbin/stop-yarn.sh ./sbin/stop-dfs.sh
Using MrJob
The Python module mrjob removes a lot of the boilerplate and can also send jobs to Amazon’s implemtation of Hadoop known as Elastic Map Reduce (EMR).
! pip install mrjob
Requirement already satisfied (use --upgrade to upgrade): mrjob in /Users/cliburn/anaconda/lib/python2.7/site-packages Requirement already satisfied (use --upgrade to upgrade): boto>=2.2.0 in /Users/cliburn/anaconda/lib/python2.7/site-packages (from mrjob) Requirement already satisfied (use --upgrade to upgrade): PyYAML in /Users/cliburn/anaconda/lib/python2.7/site-packages (from mrjob) Requirement already satisfied (use --upgrade to upgrade): simplejson>=2.0.9 in /Users/cliburn/anaconda/lib/python2.7/site-packages (from mrjob)
%%file word_count.py # From http://mrjob.readthedocs.org/en/latest/guides/quickstart.html#writing-your-first-job from mrjob.job import MRJob class MRWordFrequencyCount(MRJob): def mapper(self, _, line): yield "chars", len(line) yield "words", len(line.split()) yield "lines", 1 def reducer(self, key, values): yield key, sum(values) if __name__ == '__main__': MRWordFrequencyCount.run()
Writing word_count.py
Running the job
As a single Python process for debugging
python word_count.py books/*
To run on Hadoop cluster
python word_count.py -r hadoop books/*
To run on Amazon EMR using files on S3
python word_count.py -r emr s3://<path_to_books>
Java version
For comparison, here is the first Java version from the official tutorial :
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
Hadoop MapReduce Patterns
Most Hadoop work flows are organized as several rounds of map/reduce - this is known as job chaining. Because I/O is so expensive, chain folding where jobs are rearranged to minimize inputs/outputs and job merging where unrelated jobs using the same dataset are run togtether are common. In mrjob
, job chaining is performed via the steps abstraction .
There are several common patterns that are repeatedly used in Hadoop MapReduce programs:
- Summarization (e.g. sum, mean, counting)
- Filtering (e.g. subsampling, removing poor quality items, top 10 lists)
- Data organization (e.g. converting to hiearhical format, binning)
- Joins
While it is certinly possible, it will take a lot of work to code, debug and optimize any non-trivial program using just MapReduce construct, for example regularized logistic regression on a large data set. Hence, we will switch our focus to more modern tools such as Spark
and Impala
that provide higher level abstractions and often greater efficiency.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论