当前位置: 首页>后端>正文

大数据学习之(十五)Spark-RDD,DataFrame,DataSet

[TOC]
本文带你快速入门、搭建spark,了解 RDD,DataFrame,DataSet

spark简介

spark位于大数据生态中的位置

大数据学习之(十五)Spark-RDD,DataFrame,DataSet,第1张
spark生态架构_1.png

一、结合hadoop对比理解spark

回顾hadoop 下知识:

Hadoop Mapreduce程序,一个Mapreduce程序就是一个Job,而一个Job里面可以有一个或多个Task,Task又可以区分为Map Task和Reduce Task.

大数据学习之(十五)Spark-RDD,DataFrame,DataSet,第2张
hadoop执行过程图示_1.png
  1. 应用 application: client jvm
  2. 步骤 : map-stage,reduce-stage
    每个stage里会有一些并行的任务mapTask,reduceTask

比例: 1个app:1个job

                    1个job: 1-2 stage

                            1个stage:   N个task (map、reduce )

       **注意:  多个job 可以组成作业链(JobControl)**https://blog.csdn.net/Phalaris_arundinacea/article/details/115052372。

MapTask 执行步骤:

大数据学习之(十五)Spark-RDD,DataFrame,DataSet,第3张
mapTask3步骤.png

缺点:

  1. 冷启动:复杂业务需要多次动态启动/关闭MR的jvm 。
  2. maptask 输出需要写磁盘,而且不能多被复用。业务复杂可能多次执行。
  3. 需要自己实现 那些迭代逻辑,而spark rdd自带了很多函数如 flatMap等对比如下:
大数据学习之(十五)Spark-RDD,DataFrame,DataSet,第4张
word_count_scala.png
大数据学习之(十五)Spark-RDD,DataFrame,DataSet,第5张
word_count_hadoop.png

期望:

  1. 1个 app -》 N 个 Job ,复用资源
  2. 1个job -》N个stage : 避免多次启动jvm
  3. 1个stage -》N个Task (与hadoop一样)

一个stage的1个Task:可以在一台机器完成的计算。

reduceByKey 需要从多个机器拿取数据聚合计算,所以会新开一个stage。

所以stage的边界 是 shuffle,spark-webUI图展示如下:

大数据学习之(十五)Spark-RDD,DataFrame,DataSet,第6张
spark_stage_1.png

spark性能优化-资源复用

两个job在一个应用执行时执行复用资源范例如下代码执行两个RDD 的job0,job1 :

大数据学习之(十五)Spark-RDD,DataFrame,DataSet,第7张
spark_jobs_2.png
大数据学习之(十五)Spark-RDD,DataFrame,DataSet,第8张
spark_jobs_1.png

job0执行过后,job1紧接着复用了job0的中间资源,skip 了stage2,如下2个图:

大数据学习之(十五)Spark-RDD,DataFrame,DataSet,第9张
spark_jobs_2.1.png
大数据学习之(十五)Spark-RDD,DataFrame,DataSet,第10张
spark_jobs_2.2.png

二、spark集群总结

spark 可以依托于hadoop生态如yarn,hdfs,也可以独立

spark 有自己的资源层(master/worker),也可以有自己的临时的dfs。

spark具备启动计算程序后,为自己维护一个临时的分布式存储系统的能力,程序执行完后,临时分布式存储系统就销毁了。

spark onYarn/standalone 集群中部署模式分两种:

根据Driver(SparkContext)运行在client里还是集群里来区分

  1. 客户端模式(clientJvm)
  2. 集群模式(跟随ApplicationMaster进程一起)

ps: 客户端模式优点:分布式计算 结果汇总到driver里后,客户端模式可以直观的看到结果,如果是集群模式则看不到。

资源申请:

hadoop:

  1. 每到一步时再,动态申请资源。
  2. 任务是jvm级别

因为hadoop计算逻辑是已知的。

而spark 程序的计算逻辑是未知的,job,stage数量不一,必须运行时确定(DAG:任务切割,每一个job都会进行DAG任务切割)。

所以为了避免运行时资源申请失败,spark 采用预先抢占资源.

spark集群一启动应用就抢占每个节点的cpu、内存(Executor),此时rdd那些都还没创建/执行,未来任务是以线程级别在每个节点执行。

spark driver 和 excutor最好再一个局域网,提交通讯效率。

三、单机搭建

(一)本地开发环境搭建

win+ idea 需要按如下步骤部署,否则idea 运行Hadoop,spark会报错如下:

spark Failed to locate the winutils binary in the hadoop binary path

  1. 在win的系统中部署我们的hadoop:

    D:\codes\hadoop-2.6.5\

  2. hadoop-install\soft\bin 文件覆盖到D:\codes\hadoop-2.6.5\bin目录下
    还要将hadoop.dll 复制到 c:\windwos\system32\

  3. 设置环境变量:HADOOP_HOME C:\usr\hadoop-2.6.5\hadoop-2.6.5

(二)单机搭建

此时 spark使用自己的资源层master/worker,不需要其他的资源层如yarn。

#上传spark 到node1解压
tar xf spark-2.3.4-bin-hadoop2.6.tgz

执行cli进行spark交互式编程学习

ps:

  1. 可以用tab补全
  2. Spark context available as 'sc' (master = local[*], //单进程多线程运行
cd spark-2.3.4-bin-hadoop2.6/bin/
./spark-shell 

尝试用spark分析本机文件 /tmp/data.txt:

[root@node1 tmp]# cat data.txt 
hello hadoop
hello spark
hello bigdata

scala> sc.textFile("/tmp/data.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey((_+_)).foreach(println)

输出:

(hello,3)
(bigdata,1)
(spark,1)
(hadoop,1)

ps: 在spark-shell终端进行scala编程,也会先编译字节码然后执行。

三、standalone集群搭建

https://spark.apache.org/docs/latest/spark-standalone.html#installing-spark-standalone-to-a-cluster

基于已有的hadoop hdfs环境搭建 spark-standalone 集群

前置条件

#1.hadoop基础设施:
  jdk:1.8.xxx
  节点的配置
  部署hadoop:hdfs  zookeeper 
#2.免密
  node1 ssh 免密到node1,2,3,4
  

环境变量

#node1,2,3,4
vi /etc/profile
export SPARK_HOME=/opt/bigdata/spark-2.3.4-bin-hadoop2.6
source /etc/profile

修改配置

#1. node1上操作,添加从节点信息(node1主,其他从)
cp slaves.template slaves
vi slaves
node2
node3
node4

#2. node1 添加hadoop dfs有关的配置信息
cd conf/
mv spark-env.sh.template spark-env.sh
vi spark-env.sh
export HADOOP_CONF_DIR=/opt/bigdata/hadoop-2.6.5/etc/hadoop
export SPARK_MASTER_HOST=node1
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_WEBUI_PORT=8080
export SPARK_WORKER_CORES=4
export SPARK_WORKER_MEMORY=2g


#3. 分发到其他node2,3,4
scp -r  spark-2.3.4-bin-hadoop2.6/ node2:`pwd`
scp -r  spark-2.3.4-bin-hadoop2.6/ node3:`pwd`
scp -r  spark-2.3.4-bin-hadoop2.6/ node4:`pwd`

启动-Hadoop集群

#node2,3,4
zkServer.sh start
#检查zk集群状态
zkServer.sh status

#node1
start-dfs.sh

验证:访问Hdfs web UI控制台,主/备 node1/node2:

#node1
http://192.168.56.103:50070/dfshealth.html#tab-overview
#node2
http://192.168.56.104:50070/dfshealth.html#tab-overview

启动-spark集群

#node1执行 start-all.sh ,启动资源管理层master、workder
[root@node1 bigdata]# cd spark-2.3.4-bin-hadoop2.6/sbin/
[root@node1 sbin]# ./start-all.sh
#此时在node1有master进程,node2,3,4各有一个worker进程

访问node1的8080webUI验证:http://192.168.56.103:8080/

大数据学习之(十五)Spark-RDD,DataFrame,DataSet,第11张
spark_webUI_suc.png

hdfs准备

[root@node1 sbin]# cd /tmp/
[root@node1 tmp]# hdfs dfs -mkdir /sparktest
[root@node1 tmp]# hdfs dfs -put data.txt  /sparktest

cli spark-shell连接集群

#这个master信息在 sparkWebUI 上有。
#这样在cli里就不是本地单机执行了(local)
./spark-shell --master spark://node1:7077

此时webUI能看到cli这个应用如下:

大数据学习之(十五)Spark-RDD,DataFrame,DataSet,第12张
spark_webUI_cli_1png.png

点进去能看到每个worker默认都尽量榨干cpu,内存。

(可选)Master高可用

#node1 填入zk信息,复制到node2,3,4
vi spark-defaults.conf

#修改master信息,node1填node1,node2填node2
#跟之前hadoop HA 填clusterId不同
vi spark-env.sh
export SPARK_MASTER_HOST=node1

(可选)历史记录

开启计算层端写日志的配置

#node1
vi spark-defaults.conf
spark.eventLog.enabled true
spark.eventLog.dir hdfs://mycluster/spark_log
spark.history.fs.logDirectory  hdfs://mycluster/spark_log
#从node1分发配置到node2,3,4
scp spark-defaults.conf node2:`pwd`
scp spark-defaults.conf node3:`pwd`
scp spark-defaults.conf node4:`pwd`

重启spark

[root@node1 sbin]# ./stop-all.sh
[root@node1 sbin]# ./start-all.sh

启动-日志服务WEBUI

#node4,或者其他
./start-history-server.sh

访问URL 验证: http://node4:18080/

(可选)spark on yarn

跟 hadoop资源框架 yarn集成,统一调度。

注意

最好把spark 框架相关jar包上传导hdfs并配好路径,否则每次执行都会上传一次框架jar包,

消耗时间、磁盘空间。

提交yarn集群

直接拷贝 yarn-site.xml 文件到 $SPARK_HOME
上述任务在启动的时候,有可能会出现异常, 修改hadoop集群的yarn-site.xml文件, 增加如下配置

<property>
    <name>yarn.nodemanager.vmem-pmem-ratio</name>
    <value>4</value>
    <description>Ratio between virtual memory to physical memory when setting memory limits for containers</description>
</property>
<!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true,实际开发中设置成 true,学习阶段设置成 false -->
<property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>false</value>
</property>
<!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true,实际开发中设置成 true,学习阶段设置成 false -->
<property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
    <description>Whether virtual memory limits will be enforced for containers</description>
</property>
$SPARK_HOME/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 512M \
--executor-memory 512M \
--total-executor-cores 1 \
$SPARK_HOME/examples/jars/spark-examples_2.12-3.1.2.jar \
10

四、RDD

RDD概念

rdd已经过时,一般用dataset,sparkSQL

用java的思想来理解 就是分布式的List<Object> ,可以用stream api一直点下去,如下demo:

demo-wordCount

大数据学习之(十五)Spark-RDD,DataFrame,DataSet,第13张
spark_rdd_code.png

提交方式: spark-shell

用原始的RDD api分析 hdfs的文件,基于 spark-shell 命令行窗口来提交,可以交互式编程,一句一句输入,下面的demo每个.doSomeThing()输入完都可以 回车提交。

#hdfs搭建了HA,所以要写集群id:mycluster,不能写具体的node1/node2
scala> sc.textFile("hdfs://mycluster/sparktest/data.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey((_+_)).foreach(println)
#发现执行后没结果输出,因为结果输出在集群里,没在cli。
#加一个collect()算子,把结果回收到cli进程,即可显示结果
#而且速度很快,因为刚才的计算结果缓存过了                       
scala> sc.textFile("hdfs://mycluster/sparktest/data.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey((_+_)).collect.foreach(println)
#结果就出来了
(hello,3)                                                                
(bigdata,1)
(spark,1)
(hadoop,1)

demo-sparkPI

提交方式:spark-submit

尝试用spark-submit提交spark安装包自带的的jar包,位于example里

可以编写比较通用的提交脚本

代码见https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala

#node1
cd /opt/bigdata/spark-2.3.4-bin-hadoop2.6/examples/jars
#指定master,class ,jar包,还有给应用传入的参数,即可
#注意集群cpu,内存是否有空闲,可能需要先停掉其他的cli
../../bin/spark-submit  \
--master spark://node1:7077   \
--class org.apache.spark.examples.SparkPi \
./spark-examples_2.11-2.3.4.jar    10;

编写通用脚本

   #node1 
   #本脚本提交的任务,申请6核心,每个executor 1核,所以一共6个executor,且每个内存512m
   vi submit.sh
   
    class=
    jar=
    $SPARK_HOME/bin/spark-submit   \
    --master spark://node1:7077 \
    --total-executor-cores 6 \
    --executor-cores 1 \
    --executor-memory 512m \
    --class $class  \
    $jar \
    1000

#提交
. submit.sh   'org.apache.spark.examples.SparkPi'  "$SPARK_HOME/examples/jars/spark-examples_2.11-2.3.4.jar"

调度参数

--deploy-mode cluster 
1. driver不会再 cli进程执行,会在集群里执行,此时 sparkWebUI会出现Running Drivers,Completed Drivers。
2. print信息也不会打印在 spark-shell,当然生产一般也不用print。
3. 此时 spark shell退出,不会影响driver,它会继续运行。

--driver-memory 默认 1024m
注意这个指标的配置:
1. driver 有能力拉取计算结果回自己的jvm。
2. executor jvm中会存储中间计算过程的数据的元数据。
例如:小文件很多,分析加载时,元素据太多导致driver jvm溢出。

--executor-memory 默认 1024m

--executor-cores NUM 
1.yarn: 默认1核心。
2.standalone: 所在节点所有核心都分配给该executor。

--num-executors NUM


#其他参数如
只有cluster deploy mode可用的参数:
只有 spark standalone or mesos with cluster deploy mode only:
Spark standalone and mesos only:
--total-executor-cores 
#此时每个executor平均申请核心数目

Spark standalone and yarn only:

Yarn only:



Expamples

./bin/run-example SparkPi

DAG有向无环图

https://data-flair.training/blogs/dag-in-apache-spark/

spark DAG 和hive DAG区别

五、DataFrame 、Dataset、SparkSQL

DataFrame

可以理解为一个表,便于执行简单的sql 统计,即dataFrame = rdd + struct ;

如下示例读取json文件时,

  • 读取json文件(红线1,红线2)自带结构信息(类似sql DDL create table),于是返回了一个dataFrame。
    如果不是json文件,如csv,要自己编码解析,定义结构。
  • df往session里注册成了一个表 ,名为 ooxx,并执行 sql(红线3)查询name属性。
  • 返回表格数据
大数据学习之(十五)Spark-RDD,DataFrame,DataSet,第14张
spark_df_json.png

Dataset

  • 具有类型的RDD,如下图为携带tuple类型的rdd(红线4),类比 java List<BeanClass>。
  • 具有类似sql功能的api操作如:join。
  • 查询优化器
  • 作为spark sql的的底层支撑。
  • 编码器encoder-让bean在内存里不是对象而是字节数组,堆内外优化等(红线3)。
  • dataFrame 其实是 DataSet<Row> ,即dataset的子集。
大数据学习之(十五)Spark-RDD,DataFrame,DataSet,第15张
spark_data_set_1.png

Dataset转成dataframe执行sql

val ds01: Dataset[String] = session.read.textFile("data/person.txt")
#dataset 
val person: Dataset[(String, Int)] = ds01.map(
line => {
val strs: Array[String] = line.split(" ")
(strs(0), strs(1).toInt)
}
)(Encoders.tuple(Encoders.STRING, Encoders.scalaInt))
#转成dataframe ,定义列,注册表
val cperson: DataFrame = person.toDF("name","age")
cperson.createTempView("persion")
#执行sql
session.sql("select name from persion").show()
#结果如下
+--------+
|    name|
+--------+
|zhangsan|
|    lisi|
|  wangwu|
+--------+

SparkSQL

静态类型与运行时类型安全

从SQL的最小约束到Dataset的最严格约束,把静态类型和运行时安全想像成一个图谱。比如,如果你用的是Spark SQL的查询语句,要直到运行时你才会发现有语法错误(这样做代价很大),而如果你用的是DataFrame和Dataset,你在编译时就可以捕获错误(这样就节省了开发者的时间和整体代价)。也就是说,当你在DataFrame中调用了API之外的函数时,编译器就可以发现这个错。不过,如果你使用了一个不存在的字段名字,那就要到运行时才能发现错误了。

图谱的另一端是最严格的Dataset。因为Dataset API都是用lambda函数和JVM类型对象表示的,所有不匹配的类型参数都可以在编译时发现。而且在使用Dataset时,你的分析错误也会在编译时被发现,这样就节省了开发者的时间和代价

六集群容量评估

源数据大小,考虑副本占用,压缩占用,shuffle占用,结果数据占用,还要流出至少1/5空余。


https://www.xamrdz.com/backend/3j41931819.html

相关文章: