Guangjing Wang Ph.D. Candidate

Notes for Spark (1)

2017-07-10

Spark is a unified analytics engine for large-scale data processing. It achieves high performance for both batch and streaming data, using a state-of-the-art DAG(Directed Acyclic Graph) scheduler, a query optimizer, and a physical execution engine.

Basics

  1. RDD(Resilient Distributed Dataset),弹性分布式数据集,是分布式内存的一个抽象概念,提供一个高度受限的共享内存模型;

  2. Executor是运行在工作节点上的一个进程,负责运行任务,并为应用程序储存数据;

  3. Cluster Manager是集群资源管理器,可以是spark自带的,也可以是YARN或Mesos等资源管理框架;

  4. 与Map-Reduce相比,Spark采用的Executor有两个优点,一是利用多线程来执行具体的任务,减少任务启动开销,而Map-Reduce采用的进程模型;第二,Executor中有一个是BlockManager存储模块,会将内存和磁盘作为共同的存储设备,当需要多轮迭代计算时,可以将中间结果存储到这个中间模块,而不需要读写到HDFS等文件系统里, 有效减少IO开销;

  5. 一个应用由一个任务控制节点和若干个作业构成,一个作业由多个阶段构成,一个阶段由多个任务组成.当执行一个应用时,任务控制节点会向集群管理器申请资源,启动Executor,向Executor发送应用程序代码和文件,然后在Executor上执行任务,运行结束后,执行结果会返回给任务控制节点,或者写到HDFS或者其他数据库中.

Workflow

  1. 当一个Spark应用被提交时,首先需要为这个应用构建起基本的运行环境,即由任务控制节点创建一个SparkContext,由SparkContext负责和资源管理器(Cluster Manager)的通信以及进行资源的申请,任务的分配和监控.SparkContext会向资源管理器注册并运行Executor资源;

  2. 资源管理器会为Executor分配资源,并启动Executor进行,Executor运行情况会随着”心跳”发送到资源管理器;

  3. SparkContext会根据RDD的依赖关系构建DAG图,DAG图提交给DAG调度器进行解析,将DAG图分解成多个阶段,并且计算出每个阶段之间的依赖关系,然后把一个个”任务集”提交给底层的任务调度器进行处理,Executor向SparkContext申请任务,任务调度器将任务发送给Executor运行,同时,SparkContext将应用程序代码发放给Executor;

  4. 任务放在Executor运行,把执行结果反馈给任务调度器,然后反馈给DAG调度器,运行完毕后写入数据并释放所有资源.

RDD

RDD(Resilient Distributed Datasets)提供了一种通用的抽象数据架构,用户不必考虑底层数据的分布式特性,只需要将具体的应用逻辑表达为一系列转换处理,不同RDD之间的转化操作形成依赖关系,可以实现管道化,从而避免了中间结果的存储,大大降低了数据复制,磁盘IO和序列化开销;

RDD是个分布式对象集合,一个RDD可以分成多个分区,每个分区是一个数据集片段,一个RDD的不同分区可以被保存到集群中的不同节点上.RDD是只读的记录分区的集合,不能直接修改,只能基于稳定的物理存储中的数据来创建RDD,或者通过在其他RDD上执行确定的转换操作(map,join, groupBy等)而创建新的RDD.RDD提供了一组丰富的操作以支持常见的数据运算,分为“行动”(Action)和“转换”(Transformation)两种类型,前者用于执行计算并指定输出的形式,后者指定RDD之间的相互依赖关系。两类操作的主要区别是,转换操作(比如map、filter、groupBy、join等)接受RDD并返回RDD,而行动操作(比如count、collect等)接受RDD但是返回非RDD(即输出一个值或结果)。RDD提供的转换接口都非常简单,都是类似map、filter、groupBy、join等粗粒度的数据转换操作,而不是针对某个数据项的细粒度修改。因此,RDD比较适合对于数据集中元素执行相同操作的批处理式应用,而不适合用于需要异步、细粒度状态的应用,比如Web应用系统、增量式的网页爬虫等。

Spark通过分析各个RDD的依赖关系生成了DAG,再通过分析各个RDD中的分区之间的依赖关系来决定如何划分阶段,具体划分方法是:在DAG中进行反向解析,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到当前的阶段中;将窄依赖尽量划分在同一个阶段中,可以实现流水线计算。把一个DAG图划分成多个“阶段”以后,每个阶段都代表了一组关联的、相互之间没有Shuffle依赖关系的任务组成的任务集合。每个任务集合会被提交给任务调度器(TaskScheduler)进行处理,由任务调度器将任务分发给Executor运行.

创建RDD的两种方式:

  1. 从文件系统中加载数据创建RDD;

  2. 通过并行集合(数组)创建RDD; RDD被创建好后, 后续使用过程一般发生两种操作: transformation, 基于现有的数据集创建一个新的数据集,action,在现有数据集上进行运算,返回计算值.

Programming

from pyspark import SparkContext
sc = SparkContext( 'local', 'test')
logFile = "file:///usr/local/spark/README.md"
logData = sc.textFile(logFile, 2).cache() 

sc.textFile(文件路径+文件名) 是一个transform
sc.saveAsTextFile(要保存文件的路径) 是一个action

def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String]
# read a text file from HDFS, a local file system or any hadoop-supported filie system URI, and return it as an RDD of Strings.

本地文件系统,常见文件格式(文本文件,JSON, SequenceFile);分布式文件系统HDFS, RDD, Amazon S3的地址;数据库(如MySQL, Hbase, Hive等); 在使用Spark读取文件时, 如果使用了本地文件系统的路径,那么,必须要保证在所有的worker节点上,也都能够采用相同的路径访问到该文件,比如,可以把该文件拷贝到每个worker节点上,或者也可以使用网络挂载共享文件系统。

textFile.flatMap(lambda line: line.split(" ")).map(lambda word: (word,1)).reduceByKey(lambda a, b : a + b)
# flatMap()操作相当于把多个集合拍扁得到一个大的集合;map遍历集合中的每个单词,执行lambda表达式,对每个单词构建形成一个tuple.

Function

常见的转换操作:
filter(func):筛选出满足函数func的元素,并返回一个新的数据集;
map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集;
flatMap(func):与map()相似,但每个元素都可以映射到0或多个输出结果;
groupByKey():应用于(K,V)键值对的数据集时,返回一个新的(K,Iterable)形式的数据集
reduceByKey(func):应用于(K,V)键值对数据集时,返回一个新的(K,V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合.

例如: 键值对RDD: 从文件中加载,lines.flatMap(lambda line : line.split(" ")).map(lambda word : (word,1))

常用的键值对转换操作:
reduceByKey(), 使用func函数合并具有相同键的值
groupByKey(), 对具有相同键的值进行分组.
keys()把键值对RDD中的key返回形成一个新的RDD, 不去重
values()把键值对RDD中的value返回形成一个新的RDD,不去重
sortByKey(),排序
mapValues(func)对键值对RDD中的每个value都应用一个函数,但是key不会发生变化; 
join() 表示内连接,对于给定的两个输入数据集(K,V1)和(K,V2)只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集;

例如: pairRDD1是一个键值对集合{(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)},pairRDD2是一个键值对集合{(“spark”,”fast”)},那么,pairRDD1.join(pairRDD2)的结果就是一个新的RDD,这个新的RDD是键值对集合{(“spark”,1,”fast”),(“spark”,2,”fast”)}。

行动操作:
count()返回数据集中的元素个数
collect()以数组的形式返回数据集中的所有元素,当我们要实时查看一个RDD中的元素内容时,就可以调用collect()函数.
first()返回数据集中的第一个元素
take(n)以数组的形式返回数据集中的前n个元素
reduce(func)通过函数func(输入两个参数返回一个值)聚合数据集中的元素
foreach(func) 将数据集中的每个元素传递到函数func中运行

例如: 打印元素,rdd.foreach(print)或者rdd.map(print), 对于用集群模式执行时,rdd.collect().foreach(print), rdd.take(100).foreach(print)

Persistence

每次调用行动操作,都会触发一次从头开始的计算,对于迭代计算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据;可以通过持久化(缓存)避免这种重复计算的开销.persist()方法对一个RDD进行标记持久化,出现persist()语句的地方,并不会马上生成RDD并把它持久化,而是等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化.持久化的RDD将会保存在计算节点的内存中被后面的行动操作重复使用.一般使用cache()方法时,会调用persist(MEMORY_ONLY) [表示将RDD作为反序列化的对象存储于JVM中,如果内存不足,就要按照LRU原则替换缓存中的内容]缓存RDD.LRU全称是Least Recently Used,即最近最久未使用.

共享变量: 广播变量把变量在所有节点的内存之间进行共享,累加器支持在所有不同节点之间进行累加计算(比如计数或者求和).显式地创建广播变量只有在下面的情形中是有用的:当跨越多个阶段的那些任务需要相同的数据,或者当以反序列化方式对数据进行缓存是非常重要的。可以通过调用SparkContext.broadcast(v)从一个普通变量v创建一个广播变量,这个广播变量就是对普通变量v的一个包装器.通过调用value方法可以获得这个广播变量的值.

累加器: 一个数值型的累加器可以调用SparkContext.accumulator().运行在集群中的任务,可以使用add方法来把数值累加到累加器上,但是这些任务只能做累加操作,不能读取累加器的值,只有任务控制节点可以使用value方法读取累加器的值.


Content