3.4 Spark 开发篇
3.4.1 spark 各种语言开发
3.4.1.1 交互终端示例
Spark 构建起一个程序支持三种语言:Scala (with SBT), Java (with Maven), and Python.
表格 21 python/scala/java 在 spark 开发示例
示例 | python | scala | Java |
---|---|---|---|
变量声明 | xx = | val xx = | [TYPE] xx = |
初始化 SparkContext | from pyspark import SparkConf, SparkContext conf = SparkConf().setMaster('local').setAppName('my app') sc = SparkContext(conf=conf) | import org.apache.spark.SparkConf , SparkContext, SparkContext._ val conf = val sc = new SparkContext(conf) | import org.apache.spark.SparkConf import org.apache.spark.api.java.JavaSparkConftext SparkConf conf = JavaSparkConftext sc = new JavaSparkConftext(conf); |
3.4.1.2 pyspark 开发
配置开发环境
A brief note about Scala
Step 1: Installing Eclipse
Step 2: Installing Spark
Step 3: Installing PyDev
Step 4: Configuring PyDev with a Python interpreter
Step 5: Configuring PyDev with Py4J
Step 6: Configuring PyDev with Spark’s variables
Step 7: Creating your Python-Spark project “CountWords”
Step 8: Executing your Python-Spark application with Eclipse
Step 9: Reading a CSV file directly as a Spark DataFrame for processing SQL
Step 10: Executing your Python-Spark application on a cluster with Hadoop YARN
Step 11: Deploying your Python-Spark application in a Production environment
windows 下配置连接远程 spark,不需要配置 step 6 环境变量。但需增加 winutils
表格 22 pyspark 核心类
类名 | 简介 |
---|---|
pyspark.SparkContext | Main entry point for Spark functionality. |
pyspark.RDD | A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. |
pyspark.streaming.StreamingContext | Main entry point for Spark Streaming functionality. |
pyspark.streaming.DStream | A Discretized Stream (DStream), the basic abstraction in Spark Streaming. |
pyspark.sql.SQLContext | Main entry point for DataFrame and SQL functionality. |
pyspark.sql.DataFrame | A distributed collection of data grouped into named columns. |
3.4.2 RDD
3.4.2.1 概述
RDD:Resilient Distributed Datasets,弹性分布式数据集, 是分布式内存的一个抽象概念,RDD 提供了一种高度受限的共享内存模型,即 RDD 是只读的记录分区的集合,只能通过在其他 RDD 执行确定的转换操作(如 map、join 和 group by)而创建,然而这些限制使得实现容错的开销很低。
RDD 作为数据结构,本质上是一个只读的分区记录集合。
目前有两种类型的 RDD,如下,
表格 23 RDD 类型
类型 | 简述 | 示例 |
---|---|---|
并行集合(Parallelized Collections) | 接收一个已经存在的 Scala 集合,然后进行各种并行计算。 | data = [1, 2, 3, 4, 5] distData = sc.parallelize(data) |
Hadoop 数据集(Hadoop Datasets) | 在一个文件的每条记录上运行函数。只要文件系统是 HDFS,或者 Hadoop 支持的任意存储系统(包括本地文件,Amazon S3, Hypertable, HBase 等等)即可。 | distFile = sc.textFile("data.txt") |
定义: spark/ core/ src/ main/ scala/ org/ apache/ spark/ rdd/ RDD.scala
abstract class RDD[T:ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
private var dependencies_ : Seq[Dependency[_]] = null
@transient private var partitions_ : Array[Partition] = null
@transient val partitioner: Option[Partitioner] = None
def compute(split: Partition, context: TaskContext): Iterator[T]
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
}
说明 :
Internally, each RDD is characterized by five main properties:
- A list of partitions
- A function for computing each split
- A list of dependencies on other RDDs
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- Optionally, a list of preferred locations to compute each split on (e.g. block locations foran HDFS file)
五个核心属性,分别是
- 3 个属性(分区列表 partitions,依赖列表 dependencies,分区器 partitioner),
- 2 个函数(计算函数 compute, 优先计算位置 getPreferredLocations)。
RDD 的三个子类:MapPartitionsRDD CoalescedRDD HashPartitioner
3.4.2.2 两种操作类型
两种操作类型:
- 转化 transformation:从现有的数据集创建一个新的数据集。如 map, filter
- 动作 action:map reduce
备注:转换是惰性的,直到动作开始才会执行。
3.4.2.2.1 Transformations
The following table lists some of the common transformations supported by Spark. Refer to the RDD API doc ( Scala , Java , Python , R ) and pair RDD functions doc ( Scala , Java ) for details.
Transformation | Meaning |
---|---|
map(func) | Return a new distributed dataset formed by passing each element of the source through a function func. |
filter(func) | Return a new dataset formed by selecting those elements of the source on which funcreturns true. |
flatMap(func) | Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item). |
mapPartitions(func) | Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator => Iterator when running on an RDD of type T. |
mapPartitionsWithIndex(func) | Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator) => Iterator when running on an RDD of type T. |
sample(withReplacement, fraction, seed) | Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed. |
union(otherDataset) | Return a new dataset that contains the union of the elements in the source dataset and the argument. |
intersection(otherDataset) | Return a new RDD that contains the intersection of elements in the source dataset and the argument. |
distinct([numTasks])) | Return a new dataset that contains the distinct elements of the source dataset. |
groupByKey([numTasks]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs. Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance. Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks. |
reduceByKey(func, [numTasks]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument. |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument. |
sortByKey([ascending], [numTasks]) | When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument. |
join(otherDataset, [numTasks]) | When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin. |
cogroup(otherDataset, [numTasks]) | When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable, Iterable)) tuples. This operation is also called groupWith. |
cartesian(otherDataset) | When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements). |
pipe(command, [envVars]) | Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings. |
coalesce(numPartitions) | Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset. |
repartition(numPartitions) | Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network. |
repartitionAndSortWithinPartitions(partitioner) | Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery. |
3.4.2.2.2 Actions
The following table lists some of the common actions supported by Spark. Refer to the RDD API doc ( Scala , Java , Python , R )
and pair RDD functions doc ( Scala , Java ) for details.
Action | Meaning |
---|---|
reduce(func) | Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel. |
collect() | Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data. |
count() | Return the number of elements in the dataset. |
first() | Return the first element of the dataset (similar to take(1)). |
take(n) | Return an array with the first n elements of the dataset. |
takeSample( withReplacement, num, [seed]) | Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed. |
takeOrdered(n, [ordering]) | Return the first n elements of the RDD using either their natural order or a custom comparator. |
saveAsTextFile(path) | Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file. |
saveAsSequenceFile(path) (Java and Scala) | Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc). |
saveAsObjectFile(path) (Java and Scala) | Write the elements of the dataset in a simple format using Java serialization, which can then be loaded usingSparkContext.objectFile(). |
countByKey() | Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key. |
foreach(func) | Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems. Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details. |
3.4.2.3 RDD 持久化
当你持久化一个 RDD,每一个结点都将把它的计算分块结果保存在内存中,并在对此数据集(或者衍生出的数据集)进行的其它动作中重用。这将使得后续 的动作(Actions) 变得更加迅速(通常快 10 倍)。缓存是用 Spark 构建迭代算法的关键。 使用以下两种方法可以标记要缓存的 RDD:
lineLengths.persist()
lineLengths.cache()
取消缓存则用:
lineLengths.unpersist()
3.4.3 Spark SQL
3.4.4 Spark Streaming
Spark Streaming 是核心 Spark API 的一个扩展,它并不会像 Storm 那样一次一个地处理数据流,而是在处理前按时间间隔预先将其切分为一段一段的批处理作业。Spark 针对持续性数 据流的抽象称为 DStream(DiscretizedStream),一个 DStream 是一个微批处理(micro-batching)的 RDD(弹 性分布式数据集);而 RDD 则是一种分布式数据集,能够以两种方式并行运作,分别是任意函数和滑动窗口数据的转换。
图 16 Spark Streaming 架构图
3.4.5 Spark MLlib
MLlib : built-in machine learning library
3.4.6 Spark GraphX
本章参考
- [1]. spark 的前世今生以及其组件介绍和应用 - Spark 高速集群计算平台 http://f.dataguru.cn/thread-621195-1-1.html
- [2]. Spark 架构简明分析 http://www.aboutyun.com/thread-20781-1-1.html
- [3]. Spark(一): 基本架构及原理 http://www.cnblogs.com/tgzhu/p/5818374.html
- [4]. Spark(二): 内存管理
- [5]. Spark(三): 安装与配置
- [6]. Configuring Eclipse with Python and Spark on Hadoop https://enahwe.wordpress.com/category/spark/#Configure_PyDev_with_Spark_variables
- [7]. eclipse 配置 spark 开发环境 http://blog.csdn.net/Luckyzhou_/article/details/71411661
- [8]. 使用 Eclipse IDE 搭建 Apache Spark 的 Java 开发环境 http://blog.csdn.net/farawayzheng_necas/article/details/54574279
- [9]. Spark Streaming : processing real-time data streams
- [10]. Spark SQL, Datasets, and DataFrames : support for structured data and relational queries
- [11]. MLlib : built-in machine learning library
- [12]. GraphX : Spark’s new API for graph processing
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论