简介
Spark是一种基于内存的快、通用、可扩展的大数据分析引擎
特点
- 快
Spark与Map Reduce相比,基于内存的运行要快100倍,基于硬盘的运算要快10倍以上。其中间结果可以缓存在内存中,达到复用的目的。 - 易用
Spark支持Java、Python、Scala的API,还支持超过80种高级算法,使用户可以快速的构建不同的应用。而且Spark支持交互式的Python和Scalal的Shell。 - 通用
Spark提供了统一的解决方案,且这些方案可以应用在同一个应用中,如批处理、交互式查询、实时流处理、机器学习和图计算。减少了开发和维护的成本和部署平台的物力成本。 - 兼容性
Spark可以非常方便地和其它的开源产品进行融合。
Spark快于Map Reduce的原因?
网上大部分说是因为Spark是基于内存的,而MapReduce是基于磁盘的。
还有说Spark中具有DAG有向无环图,DAG有向无环图在此过程中减少了shuffle以及落地磁盘的次数。
上述描述确实都对,但是我更相信下面的说法。
以下内容来自
作者:连城
链接:https://www.zhihu.com/question/23079001/answer/23569986
来源:知乎
在Spark内部,单个executor进程内RDD的分片数据是用Iterator流式访问的,Iterator的hasNext方法和next方法是由RDD lineage上各个transformation携带的闭包函数复合而成的。该复合Iterator每访问一个元素,就对该元素应用相应的复合函数,得到的结果再流式地落地(对于shuffle stage是落地到本地文件系统留待后续stage访问,对于result stage是落地到HDFS或送回driver端等等,视选用的action而定)。如果用户没有要求Spark cache该RDD的结果,那么这个过程占用的内存是很小的,一个元素处理完毕后就落地或扔掉了(概念上如此,实现上有buffer),并不会长久地占用内存。只有在用户要求Spark cache该RDD,且storage level要求在内存中cache时,Iterator计算出的结果才会被保留,通过cache manager放入内存池。
简单起见,暂不考虑带shuffle的多stage情况和流水线优化。这里拿最经典的log处理的例子来具体说明一下(取出所有以ERROR开头的日志行,按空格分隔并取第2列):
val lines = spark.textFile("hdfs://<input>")
val errors = lines.filter(_.startsWith("ERROR"))
val messages = errors.map(_.split(" ")(1))
messages.saveAsTextFile("hdfs://<output>")
按传统单机immutable FP的观点来看,上述代码运行起来好像是:
- 把HDFS上的日志文件全部拉入内存形成一个巨大的字符串数组,
- Filter一遍再生成一个略小的新的字符串数组,
- 再map一遍又生成另一个字符串数组。
真这么玩儿的话Spark早就不用混了……
如前所述,Spark在运行时动态构造了一个复合Iterator。就上述示例来说,构造出来的Iterator的逻辑概念上大致长这样:
new Iterator[String] {
private var head: String = _
private var headDefined: Boolean = false
def hasNext: Boolean = headDefined || {
do {
try head = readOneLineFromHDFS(...) // (1) read from HDFS
catch {
case _: EOFException => return false
}
} while (!head.startsWith("ERROR")) // (2) filter closure
true
}
def next: String = if (hasNext) {
headDefined = false
head.split(" ")(1) // (3) map closure
} else {
throw new NoSuchElementException("...")
}
}
模块
重要角色
Driver
- 把用户程序转为作业(JOB)
- 跟踪Executor的运行状况
- 为执行器节点调度任务
- UI展示应用运行状况
Executor
- 负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程;
- 通过自身的块管理器(Block Manager)为用户程序中要求缓存的RDD提供内存式存储。RDD是直接缓存在Executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算。
运行逻辑
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2pnUlnyD-1573975929640)(assets/1573956622450.png)]
Spark Core
实现了Spark的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。
Spark Core中还包含了对弹性分布式数据集(Resilient Distributed DataSet,简称RDD)的API定义
RDD
RDD(Resilient Distributed Dataset)弹性分布式数据集,是Spark中最基本的数据抽象。
代表一个不可变的、可分区、里面的元素可并行计算的集合。
官方这样定义RDD
* Internally, each RDD is characterized by five main properties:
*
* - A list of partitions
* - A function for computing each split
* - A list of dependencies on other RDDs
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
在内部,每个RDD是由以下5个主要特征组成的:
1. 每个RDD都有一组分区
2. 每个分区上都是一种计算逻辑
3. 各个RDD之间都有依赖关系
4. 可选,对于K-V类型的数据有一个分区器。(就是说,如果是K-V类型数据,就要告诉它怎么去分区,默认是根据Keyhash)
5. 可选,一个存储每个分区优先计算位置的列表(因为计算是由Driver发给Executor的,
在HDFS上并不是每个节点都有所有的数据,而且节点之间的距离也影响着IO传输,所以需要知道哪个节点运行效率最高)
知识点
- RDD的算子分为两种,一种叫转换算子,一种叫行动算子。只有当行动算子触发时,转换算子才会依次执行。
- RDD里面并没有数据、RDD可以理解为对于真实数据的计算描述
RDD的创建
object RDDLearning {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDDLearning").setMaster("local[*]")
val sc = new SparkContext(conf)
// 1. 由集合创建
//有两种方式 parallelize 和 makeRDD
//其中makeRDD底层调用的就是parallelize 为了好记,我常用makeRDD
//def makeRDD[T: ClassTag](
// seq: Seq[T],
// numSlices: Int = defaultParallelism): RDD[T] = withScope {
// parallelize(seq, numSlices)
//}
val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7))
val rdd2: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7))
// 2. 由外部文件创建
val fileRdd: RDD[String] = sc.textFile("dir/in/data.txt")
// 3. 由其它RDD创建 其实就是转换后的RDD
val rdd3: RDD[Array[String]] = fileRdd.map(_.split(" "))
sc.stop
}
}
RDD的分区
object RDDLearning {
def main(args: Array[String]): Unit = {
//local[3] 代表三个分区
val conf = new SparkConf().setAppName("RDDLearning").setMaster("local[3]")
val sc = new SparkContext(conf)
val rdd1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7))
// val rdd1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7),1) 也可以这样指定分区
println("分区数:"+rdd1.partitions.length)//3
rdd1.mapPartitionsWithIndex {
case (num, datas) => {
datas.map((_, "分区号:" + num))
}
}.collect.foreach(println)
//(1,分区号:0)
//(2,分区号:0)
//(3,分区号:1)
//(4,分区号:1)
//(5,分区号:2)
//(6,分区号:2)
//(7,分区号:2)
sc.stop
}
}
分区逻辑
- 由集合生成的RDD分区逻辑
// 1. 我们可以跟踪源码
def parallelize[T: ClassTag](
seq: Seq[T],
// 2. 如果没有指定分区数,就会使用默认的并行度
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
// 3. defaultParallelism 默认并行的的逻辑
def defaultParallelism: Int = {
assertNotStopped()
taskScheduler.defaultParallelism //这个
//org.apache.spark.scheduler.TaskSchedulerImpl#defaultParallelism
//TaskSchedulerImpl是个特性 trait 所以找它的实现类
}
// 4. 这里随便找了一个LocalSchedulerBackend
override def defaultParallelism(): Int =
scheduler.conf.getInt("spark.default.parallelism", totalCores)
//结论: 如果设置中设置了spark.default.parallelism 就选用它的值
//没有的话就算totalCores 总共的内核数 这也就是为什么Local[3] 就三个分区的原因
- 由外部文件生成的RDD
//1. 依旧看源码
def textFile(
path: String,
//2. 这里可以指定最小的分区数(之所以最小,是因为最后还是根据文件大小分区)
//如果没指定
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],minPartitions)
.map(pair => pair._2.toString).setName(path)
}
//3. 没指定就和普通集合生成的并行度与2取最小值
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
//4. 再来看具体的分区运输规则
hadoopFile(path,...,minPartitions) //分区数给了hadoopFile
//5. hadoopFile里面
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
//6. hadoopRDD里面有这个方法
override def getPartitions: Array[Partition] = {
val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
}
//7. 这里看一下org.apache.hadoop.mapred.FileInputFormat#getSplits 别的逻辑都差不多
//因为篇幅,我只取了分区逻辑部分,不是所有代码
public InputSplit[] getSplits(JobConf job, int numSplits(这是minPartitions)){
long totalSize = 0;
for (FileStatus file: files) {
totalSize += file.getLen(); //获得所有文件的大小,因为路径可以是个目录
}
//常理走下来 goalSize=numSplits 这里假设我们设置是local[*]
//goalSize=所有文件的大小/2
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
//minSize=1
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
// generate splits
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
for (FileStatus file: files) {
long length = file.getLen();
if (isSplitable(fs, path)) {
//本地跑的所以是32M
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(goalSize=2, minSize=1, blockSize=32M);
//逻辑Math.max(minSize, Math.min(goalSize, blockSize));
//splitSize=超过块大小的文件就是块大小,没超过的就是文件总的大小
//未切分文件大小
long bytesRemaining = length;
//以下的切片逻辑差不多就是如果小文件就一片 因为bytesRemaining=splitSize
//如果是大文件的话 就是以一个块大小为一片,如果剩余文件大小/块大小<=1.1 就直接一片
while (((double) bytesRemaining)/splitSize > 1.1) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
splitHosts[0], splitHosts[1]));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
splitHosts[0], splitHosts[1]));
}
} else {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
}
重分区
object RDDLearning {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDDLearning").setMaster("local[3]")
val sc = new SparkContext(conf)
val rdd1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7))
//重分区也有两个repartition 和coalesce
//def repartition(numPartitions: Int) RDD[T] = {
// coalesce(numPartitions, shuffle = true)
//}
rdd1.repartition(4).mapPartitionsWithIndex {
case (num, datas) => {
datas.map((_, "分区号:" + num))
}
}.collect.foreach(println)
//很奇怪,但是确实是hash分区的
//(2,分区号:0)
//(4,分区号:0)
//(6,分区号:0)
//(7,分区号:1)
//(1,分区号:3)
//(3,分区号:3)
//(5,分区号:3)
sc.stop
}
}
序列化
由于Spark的分布式的分析引擎,数据的初始化在Driver端,但是实际运行程序是在Executor端,这就涉及到了跨进程通信,是需要序列化的。
class SearchFunctions(val query: String) extends Serializable {
//第一个方法是判断输入的字符串是否存在query 存在返回true,不存在返回false
def isMatch(s: String): Boolean = {
s.contains(query)
}
def getMatchesFunctionReference(rdd: org.apache.spark.rdd.RDD[String]): org.apache.spark.rdd.RDD[String] = {
// 需要序列化:"isMatch"表示"this.isMatch",因此我们要传递整个"this"
rdd.filter(isMatch)
}
def getMatchesFieldReference(rdd: org.apache.spark.rdd.RDD[String]): org.apache.spark.rdd.RDD[String] = {
// 需要序列化:"query"表示"this.query",因此我们要传递整个"this"
rdd.filter(x=>x.contains(query))
}
def getMatchesNoReference(rdd: org.apache.spark.rdd.RDD[String]): org.apache.spark.rdd.RDD[String] = {
// 不需要序列化:只把我们需要的字段拿出来放入局部变量中
val query_ = this.query
rdd.filter(x => x.contains(query_))
}
}
object SearchFunctions {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("SearchFunctions")
conf.setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
val rdd = sc.parallelize(List("hello java", "hello scala hello", "hello hello"))
val sf = new SearchFunctions("hello")
val unit: RDD[String] = sf.getMatchesNoReference(rdd)
unit.foreach(println)
}
}
缓存
RDD通过persist方法或cache方法可以将前面的计算结果缓存,默认情况下persist() 会把数据以序列化的形式缓存在JVM 的堆空间中。
存储级别
NONE 不缓存
DISK_ONLY 只存磁盘
DISK_ONLY_2 只存磁盘,存两份
MEMORY_ONLY
MEMORY_ONLY_2
MEMORY_ONLY_SER 只存内存中并且序列化存储
MEMORY_ONLY_SER_2
MEMORY_AND_DISK
MEMORY_AND_DISK_2
MEMORY_AND_DISK_SER
MEMORY_AND_DISK_SER_2
OFF_HEAP 存堆外内存中
缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
object RDD_Cache {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("RDD_Cache").setMaster("local[2]")
val sc = new SparkContext(conf)
val timeRdd = sc.makeRDD(List("zzy"))
val mapRDD1 = timeRdd.map(x => (x, System.currentTimeMillis()))
println(mapRDD1.collect.toBuffer)
println(mapRDD1.collect.toBuffer)
println(mapRDD1.collect.toBuffer) //上面三个时间都不同
mapRDD1.cache() //mapRDD1.persist()
println(mapRDD1.collect.toBuffer)
println(mapRDD1.collect.toBuffer)
println(mapRDD1.collect.toBuffer) //上面三个时间都相同 缓存了
}
}
分区器
- 只有Key-Value类型的RDD才有分区器的,非Key-Value类型的RDD分区器的值是None
- 每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。
object SubjectDemo3 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SubjectDemo").setMaster("local")
val sc = new SparkContext(conf)
// 1.对数据进行切分
val tuples: RDD[(String, Int)] =
sc.textFile("C:\Users\Administrator\Desktop\subjectaccess\access.txt").map(line => {
val fields: Array[String] = line.split("\t")
//取出url
val url = fields(1)
(url, 1)
})
//将相同url进行聚合,得到了各个学科的访问量
val sumed: RDD[(String, Int)] = tuples.reduceByKey(_ + _).cache()
//从url中获取学科的字段 数据组成式 学科, url 统计数量
val subjectAndUC = sumed.map(tup => {
val url = tup._1 //用户url
val count = tup._2 // 统计的访问数量
val subject = new URL(url).getHost //学科
(subject, (url, count))
})
//将所有学科取出来
val subjects: Array[String] = subjectAndUC.keys.distinct.collect
//创建自定义分区器对象
val partitioner: SubjectPartitioner = new SubjectPartitioner(subjects)
//分区
val partitioned: RDD[(String, (String, Int))] = subjectAndUC.partitionBy(partitioner)
//取top3
val rs = partitioned.mapPartitions(it => {
val list = it.toList
val sorted = list.sortBy(_._2._2).reverse
val top3: List[(String, (String, Int))] = sorted.take(3)
//因为方法的返回值需要一个iterator
top3.iterator
})
//存储数据
rs.saveAsTextFile("out2")
sc.stop()
}
}
/**
* 自定义分区器需要继承Partitioner并实现对应方法
*/
class SubjectPartitioner(subjects: Array[String]) extends Partitioner {
//创建一个map集合用来存到分区号和学科
val subject = new mutable.HashMap[String, Int]()
//定义一个计数器,用来生成分区好
var i = 0
for (s <- subjects) {
//存学科和分区
subject += (s -> i)
i += 1 //分区自增
} // 获取分区数
override def numPartitions: Int = subjects.size
//获取分区号(如果传入的key不存在,默认将数据存储到0分区)
override def getPartition(key: Any): Int = subject.getOrElse(key.toString, 0)
}
累加器
解决的问题:分布式只写共享变量
object AccumulatorDemo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("AccumulatorDemo").setMaster("local[2]")
val sc = new SparkContext(conf)
val dataRDD= sc.makeRDD(1 to 10)
var sum = 0 //由于sum只存在在driver端,而foreach(内得内容要发送到Executor上执行,并没有sum,所以累计失败)
dataRDD.foreach(x=>sum=sum+x)
println(sum) // 0
//1. 创建一个累加器
val acc=new LongAccumulator
//2. 注册
sc.register(acc)
//3. 使用
dataRDD.foreach(x=>acc.add(x))
println(acc.value) //55
sc.stop()
}
}
自定义累加器
class MyAccumulator extends AccumulatorV2[Int,Int]{
//创建一个输出值的变量
private var sum:Int = _
//必须重写如下方法:
//检测方法是否为空
override def isZero: Boolean = sum == 0
//拷贝一个新的累加器
override def copy(): AccumulatorV2[Int, Int] = {
//需要创建当前自定累加器对象
val myaccumulator = new MyAccumulator()
//需要将当前数据拷贝到新的累加器数据里面
//也就是说将原有累加器中的数据拷贝到新的累加器数据中
//ps:个人理解应该是为了数据的更新迭代
myaccumulator.sum = this.sum
myaccumulator
}
//重置一个累加器 将累加器中的数据清零
override def reset(): Unit = sum = 0
//每一个分区中用于添加数据的方法(分区中的数据计算)
override def add(v: Int): Unit = {
//v 即 分区中的数据
//当累加器中有数据的时候需要计算累加器中的数据
sum += v
}
//合并每一个分区的输出(将分区中的数进行汇总)
override def merge(other: AccumulatorV2[Int, Int]): Unit = {
//将每个分区中的数据进行汇总
sum += other.value
}
//输出值(最终累加的值)
override def value: Int = sum
}
object MyAccumulator{
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("MyAccumulator").setMaster("local[*]")
//2.创建SparkContext 提交SparkApp的入口
val sc = new SparkContext(conf)
val numbers = sc .parallelize(List(1,2,3,4,5,6),2)
val accumulator = new MyAccumulator()
//需要注册
sc.register(accumulator,"acc")
//切记不要使用Transformation算子 会出现无法更新数据的情况
//应该使用Action算子
//若使用了Map会得不到结果
numbers.foreach(x => accumulator.add(x))
println(accumulator.value)
}
}
广播变量
解决的问题:分布式只读共享变量
object BroadcastDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("BroadcastDemo").setMaster("local[2]")
val sc = new SparkContext(conf)
//list是在driver端创建也相当于是本地变量
val list = List("hello java")
val lines = sc.textFile("dir/file")
//算子部分是在Excecutor端执行
val filterStr = lines.filter(list.contains(_))
filterStr.foreach(println)
}
}
上面的代码有个问题,就是Driver会把list以task的方式发送到executor上执行,可以粗略的认为一个分区就算一个task,那么list就可能会在一个executor上重复多份。如果list稍微大点可能就会造成内存溢出。
object BroadcastDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("BroadcastDemo").setMaster("local[2]")
val sc = new SparkContext(conf)
//list是在driver端创建也相当于是本地变量
val list = List("hello java")
//封装广播变量
val broadcast = sc.broadcast(list)
//算子部分是在Excecutor端执行
val lines = sc.textFile("dir/file")
//使用广播变量进行数据处理 value可以获取广播变量的值
val filterStr = lines.filter(broadcast.value.contains(_))
filterStr.foreach(println)
}
}