当前位置: 首页>编程语言>正文

spark中怎么把Array中的一部分取出来 spark string

Scala版本WordCount

  1. 项目目录结构如下:



spark中怎么把Array中的一部分取出来 spark string,spark中怎么把Array中的一部分取出来 spark string_spark将rdd转为string,第1张


  1. 在项目目录data下创建要统计词频的文件words.txt


spark中怎么把Array中的一部分取出来 spark string,spark中怎么把Array中的一部分取出来 spark string_数据_02,第2张


  1. 新建Scala版的WordCount程序
import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object WordCount {  def main(args: Array[String]): Unit = {    /**     * SparkConf创建Spark应用程序的配置     *     * setAppName:用来设置应用程序的名称,如果些程序运行在Spark的集群上,可以在资源管理器的UI上看到,如standalone,yarn,mesos     *     * setMaster:用来设置应用程序的本地运行模式     *  1) local: Spark应用程序本地运行模式,如果local后不不指定参数,就默认使用一个core来运行Spark应用程序     *     local[n]: 指定使用n个cores来运行Spark应用程序     *     local[*]: 使用本地机器所有的cores来运行Spark应用程序     *     这里的core是对应计算机线程,假如你的计算机是4核8线程,那么你的计算机总共可以提供8个cores来运行Spark应用程序     * 如果不设置setMaster, 将来程序要打包发布到集群上了运行,集群主要有standalone, yarn, mesos     *  2) standalone: Spark自带的资源管理器,主节点叫Master, 从节点叫Worker     *  3) yarn: 基于Hadoop yarn资源管理器运行Spark程序,主节点叫ResourceManager, 从节点叫NodeManager     *  4) mesos: 一般国外用的比较多,这里就不详细讲解     *     */    val conf = new SparkConf().setMaster("local").setAppName("WordCount")    /**     * SparkContext: 它是Spark应用程序通往集群的唯一通道, 创建了SparkContext     * SparkContext底层默认会创建两个对象DAGScheduler和TaskScheduler     * DAGScheduler: 依赖Spark应用程序中的RDD的宽窄依赖切割Job, 划分Stage, 每个Stage又会封装成TaskSet,提交给TaskScheduler     *     * TaskScheduler: 负责从TaskSet中遍历一个个的并行的Task, 发送给工作节点中的Executor中的ThreadPool执行,监控Task执行,回收结果。     */    val sc = new SparkContext(conf)    /**     * 设置Spark应用程序的运行日志级别     */    sc.setLogLevel("WARN")    /**     * 通过调用Hadoop底层的方法来切分文件,创建RDD,     */    val lines: RDD[String] = sc.textFile("./data/words.txt")    /**     * flatMap: 功能是将RDD中的partition中的一行行的数据依照空格切分压平     * 它是一个lazy的Transformation算子,懒执行,需要由action算子触发执行     * flatMap算子是1对多,进去一行行的文本,出来一个个的单词     * flatMap内部的逻辑是在Executor中执行     */    val words: RDD[String] = lines.flatMap(line => line.split(" "))    /**     * map: 功能是将上步生成的RDD的每个元素创建成K,V格式的RDD     * map也是懒执行算子,需要有action类算子触发执行     * map的操作是1对1,对于每一个元素,调用一次方法     * 对每一个单词计数1,形成单词与1的键值对,如(金庸,1),(金庸,1),(天龙八步,1)...     */    val pairs: RDD[(String, Int)] = words.map(word => (word, 1))    /**     * reduceByKey: 对上步RDD的partition中的K,V格式的RDD先分组再聚合, 是懒执行算子     *     * reduceByKey是一个shuffle类的算子,如果数据分布在多台节点上,会进行局部分组聚合,再全局聚合, 对于跨节点的数据会落地磁盘小文件     * 基处理也要经过mapper和reducer两个阶段     *     * 假如数据有两个分区,分别在node1节点和node2节点上,那么:     *     * ===================>mapper=========================>shuffle=================>reducer=========================     *   node1节点:     *      (金庸,1)     *      (金庸,1)     *      (金庸,1)     *      (金庸,1)        局部分组聚合=> (金庸,4)          => 写入磁盘落地                     (金庸,6)     *      (天龙八部,1)                  (天龙八部,4)     *      (天龙八部,1)     *      (天龙八部,1)     *      (天龙八部,1)                                                         => 全局聚合     *  node2节点:                                                                           (天龙八部,6)     *      (金庸,1)                                                                         (段誉,2)     *      (金庸,1)     *      (天龙八部,1)    局部分组聚合=> (金庸,2)          => 写入磁盘落地     *      (天龙八部,1)                 (天龙八部,2)     *      (段誉,1)                     (段誉,2)     *      (段誉,1)     *     */    val reduced: RDD[(String, Int)] = pairs.reduceByKey((v1, v2) => v1 + v2)    /**     * sortBy: 用来对RDD中的数据进行排序,是懒执行算子,它需要两个参数     *  1)参数1是一个f, 传递排序的规则     *  2) 参数2是排序的方式,默认是true, 表示升序, 如果需要降序,需要设置成false     */    val sorted: RDD[(String, Int)] = reduced.sortBy(tup => tup._2, ascending = false)    /**     * foreach: 是一个action算子,触发执行     *     * Spark应用程序中有几个action算子就有几个job     */    sorted.foreach(tup=>println(tup))    //退出,释放资源    sc.stop()  }}

输出结果如下:

(金庸,25)(天龙八部,13)(段誉,10)(神雕侠女,6)(射雕英雄传,6)(郭靖,5)(黄蓉,5)(杨过,4)(乔峰,4)(小龙女,2)(段正淳,2)(阿朱,2)(阿紫,2)(木婉清,1)(王语嫣,1)(黄药师,1)(晓蕾,1)(虚竹,1)(段王爷,1)(马夫人,1)(钟灵,1)(欧阳锋,1)(老顽童,1)(洪七公,1)(梅超风,1)(尹士平,1)(段延庆,1)

https://www.xamrdz.com/lan/58t1963788.html

相关文章: