Scala版本WordCount
- 项目目录结构如下:
- 在项目目录data下创建要统计词频的文件words.txt
- 新建Scala版的WordCount程序
import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object WordCount { def main(args: Array[String]): Unit = { /** * SparkConf创建Spark应用程序的配置 * * setAppName:用来设置应用程序的名称,如果些程序运行在Spark的集群上,可以在资源管理器的UI上看到,如standalone,yarn,mesos * * setMaster:用来设置应用程序的本地运行模式 * 1) local: Spark应用程序本地运行模式,如果local后不不指定参数,就默认使用一个core来运行Spark应用程序 * local[n]: 指定使用n个cores来运行Spark应用程序 * local[*]: 使用本地机器所有的cores来运行Spark应用程序 * 这里的core是对应计算机线程,假如你的计算机是4核8线程,那么你的计算机总共可以提供8个cores来运行Spark应用程序 * 如果不设置setMaster, 将来程序要打包发布到集群上了运行,集群主要有standalone, yarn, mesos * 2) standalone: Spark自带的资源管理器,主节点叫Master, 从节点叫Worker * 3) yarn: 基于Hadoop yarn资源管理器运行Spark程序,主节点叫ResourceManager, 从节点叫NodeManager * 4) mesos: 一般国外用的比较多,这里就不详细讲解 * */ val conf = new SparkConf().setMaster("local").setAppName("WordCount") /** * SparkContext: 它是Spark应用程序通往集群的唯一通道, 创建了SparkContext * SparkContext底层默认会创建两个对象DAGScheduler和TaskScheduler * DAGScheduler: 依赖Spark应用程序中的RDD的宽窄依赖切割Job, 划分Stage, 每个Stage又会封装成TaskSet,提交给TaskScheduler * * TaskScheduler: 负责从TaskSet中遍历一个个的并行的Task, 发送给工作节点中的Executor中的ThreadPool执行,监控Task执行,回收结果。 */ val sc = new SparkContext(conf) /** * 设置Spark应用程序的运行日志级别 */ sc.setLogLevel("WARN") /** * 通过调用Hadoop底层的方法来切分文件,创建RDD, */ val lines: RDD[String] = sc.textFile("./data/words.txt") /** * flatMap: 功能是将RDD中的partition中的一行行的数据依照空格切分压平 * 它是一个lazy的Transformation算子,懒执行,需要由action算子触发执行 * flatMap算子是1对多,进去一行行的文本,出来一个个的单词 * flatMap内部的逻辑是在Executor中执行 */ val words: RDD[String] = lines.flatMap(line => line.split(" ")) /** * map: 功能是将上步生成的RDD的每个元素创建成K,V格式的RDD * map也是懒执行算子,需要有action类算子触发执行 * map的操作是1对1,对于每一个元素,调用一次方法 * 对每一个单词计数1,形成单词与1的键值对,如(金庸,1),(金庸,1),(天龙八步,1)... */ val pairs: RDD[(String, Int)] = words.map(word => (word, 1)) /** * reduceByKey: 对上步RDD的partition中的K,V格式的RDD先分组再聚合, 是懒执行算子 * * reduceByKey是一个shuffle类的算子,如果数据分布在多台节点上,会进行局部分组聚合,再全局聚合, 对于跨节点的数据会落地磁盘小文件 * 基处理也要经过mapper和reducer两个阶段 * * 假如数据有两个分区,分别在node1节点和node2节点上,那么: * * ===================>mapper=========================>shuffle=================>reducer========================= * node1节点: * (金庸,1) * (金庸,1) * (金庸,1) * (金庸,1) 局部分组聚合=> (金庸,4) => 写入磁盘落地 (金庸,6) * (天龙八部,1) (天龙八部,4) * (天龙八部,1) * (天龙八部,1) * (天龙八部,1) => 全局聚合 * node2节点: (天龙八部,6) * (金庸,1) (段誉,2) * (金庸,1) * (天龙八部,1) 局部分组聚合=> (金庸,2) => 写入磁盘落地 * (天龙八部,1) (天龙八部,2) * (段誉,1) (段誉,2) * (段誉,1) * */ val reduced: RDD[(String, Int)] = pairs.reduceByKey((v1, v2) => v1 + v2) /** * sortBy: 用来对RDD中的数据进行排序,是懒执行算子,它需要两个参数 * 1)参数1是一个f, 传递排序的规则 * 2) 参数2是排序的方式,默认是true, 表示升序, 如果需要降序,需要设置成false */ val sorted: RDD[(String, Int)] = reduced.sortBy(tup => tup._2, ascending = false) /** * foreach: 是一个action算子,触发执行 * * Spark应用程序中有几个action算子就有几个job */ sorted.foreach(tup=>println(tup)) //退出,释放资源 sc.stop() }}
输出结果如下:
(金庸,25)(天龙八部,13)(段誉,10)(神雕侠女,6)(射雕英雄传,6)(郭靖,5)(黄蓉,5)(杨过,4)(乔峰,4)(小龙女,2)(段正淳,2)(阿朱,2)(阿紫,2)(木婉清,1)(王语嫣,1)(黄药师,1)(晓蕾,1)(虚竹,1)(段王爷,1)(马夫人,1)(钟灵,1)(欧阳锋,1)(老顽童,1)(洪七公,1)(梅超风,1)(尹士平,1)(段延庆,1)