文章目录
- 1. 创建Dataset
- 1.1 方式一:使用序列
- 1.2 方式二:使用JSON数据,将DataFrame转换成Dataset
- 1.3 方式三:使用其他数据
- 2. 操作Dataset
- 2.1 基本操作
- 2.2 多表关联
Dataset 是 Spark 1.6 之后,对 DataFrame做的一个封装。为了解决DataFrame缺乏编译时类型安全这个问题。可以把Dataset理解成高级的DataFrame。
1. 创建Dataset
1.1 方式一:使用序列
scala> case class MyData(a:Int,b:String)
defined class MyData
scala> val ds = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS
ds: org.apache.spark.sql.Dataset[MyData] = [a: int, b: string]
scala> ds.show
+---+----+
| a| b|
+---+----+
| 1| Tom|
| 2|Mary|
+---+----+
1.2 方式二:使用JSON数据,将DataFrame转换成Dataset
scala> case class Person(name:String,age:BigInt)
defined class Person
scala> val df = spark.read.format("json").load("/opt/top_resources/test/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
// DataFrame转Dataset
scala> df.as[Person]
res12: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]
scala> df.as[Person].show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
1.3 方式三:使用其他数据
Dataset可以理解为RDD操作和DataFrame操作结合
// 本地
scala> val linesDS=spark.read.text("/opt/top_resources/test/test_WordCount.txt").as[String]
linesDS: org.apache.spark.sql.Dataset[String] = [value: string]
// HDFS数据
scala> val linesDS = spark.read.text("hdfs://hadoop111:9000/test/test_WordCount.txt").as[String]
scala> val words = linesDS.flatMap(_.split(" ")).filter(_.length>3)
words: org.apache.spark.sql.Dataset[String] = [value: string]
scala> words.show
+----------+
| value|
+----------+
| love|
| Beijing|
| love|
| China|
| Beijing|
| capital|
| China|
|hehehehehe|
+----------+
scala> val result = linesDS.flatMap(_.split(" ")).map((_,1)).groupByKey(x=>x._1).count
result: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]
等价于==》var a = linesDS.flatMap(_.split(" ")).groupBy("value").count.show
等价于==》var a = linesDS.flatMap(_.split(" ")).groupBy($"value").count.show
scala> result.show
+----------+--------+
| value|count(1)|
+----------+--------+
|hehehehehe| 1|
| Beijing| 2|
| love| 2|
| China| 2|
| is| 1|
| capital| 1|
| the| 1|
| of| 1|
| I| 2|
+----------+--------+
scala> result.orderBy($"value").show
等价于==》scala> var a = word.groupBy($"value").count.orderBy("value").show
+----------+--------+
| value|count(1)|
+----------+--------+
| Beijing| 2|
| China| 2|
| I| 2|
| capital| 1|
|hehehehehe| 1|
| is| 1|
| love| 2|
| of| 1|
| the| 1|
+----------+--------+
scala> result.orderBy($"count(1)").show
等价于==》scala> var a = word.groupBy($"value").count.orderBy("count").show
+----------+--------+
| value|count(1)|
+----------+--------+
| is| 1|
| capital| 1|
| of| 1|
| the| 1|
|hehehehehe| 1|
| Beijing| 2|
| China| 2|
| love| 2|
| I| 2|
+----------+--------+
2. 操作Dataset
2.1 基本操作
scala> val empDS = df1.as[Emp]
empDS: org.apache.spark.sql.Dataset[Emp] = [empno: int, ename: string ... 6 more fields]
scala> empDS.filter(_.sal > 3000).show
+-----+-----+---------+----+----------+----+----+------+
|empno|ename| job| mgr| hiredate| sal|comm|deptno|
+-----+-----+---------+----+----------+----+----+------+
| 7839| KING|PRESIDENT|7839|1981/11/17|5000| 0| 10|
+-----+-----+---------+----+----------+----+----+------+
scala> empDS.filter(_.deptno==10).show
+-----+------+---------+----+----------+----+----+------+
|empno| ename| job| mgr| hiredate| sal|comm|deptno|
+-----+------+---------+----+----------+----+----+------+
| 7782| CLARK| MANAGER|7839| 1981/6/9|2450| 0| 10|
| 7839| KING|PRESIDENT|7839|1981/11/17|5000| 0| 10|
| 7934|MILLER| CLERK|7782| 1982/1/23|1300| 0| 10|
+-----+------+---------+----+----------+----+----+------+
2.2 多表关联
scala> case class Dept(deptno:Int,dname:String,loc:String)
scala> var lines = sc.textFile("/opt/top_resources/test/dept.csv").map(_.split(","))
scala> var allDept = lines.map(x=>Dept(x(0).toInt,x(1),x(2))) // 返回的是一个RDD
scala> val deptDS = allDept.toDS // RDD转DS
deptDS: org.apache.spark.sql.Dataset[Dept] = [deptno: int, dname: string ... 1 more field]
// deptDS和empDS关联的字段(这里两个表的字段名相同)
// Emp表:
+-----+------+---------+----+----------+----+----+------+
|empno| ename| job| mgr| hiredate| sal|comm|deptno|
+-----+------+---------+----+----------+----+----+------+
| 7369| SMITH| CLERK|7902|1980/12/17| 800| 0| 20|
| 7499| ALLEN| SALESMAN|7698| 1981/2/20|1600| 300| 30|
| 7521| WARD| SALESMAN|7698| 1981/2/22|1250| 500| 30|
| 7566| JONES| MANAGER|7839| 1981/4/2|2975| 0| 20|
| 7654|MARTIN| SALESMAN|7698| 1981/9/28|1250|1400| 30|
| 7698| BLAKE| MANAGER|7839| 1981/5/1|2850| 0| 30|
| 7782| CLARK| MANAGER|7839| 1981/6/9|2450| 0| 10|
| 7788| SCOTT| ANALYST|7566| 1987/4/19|3000| 0| 20|
| 7839| KING|PRESIDENT|7839|1981/11/17|5000| 0| 10|
| 7844|TURNER| SALESMAN|7698| 1981/9/8|1500| 0| 30|
| 7876| ADAMS| CLERK|7788| 1987/5/23|1100| 0| 20|
| 7900| JAMES| CLERK|7698| 1981/12/3| 950| 0| 30|
| 7902| FORD| ANALYST|7566| 1981/12/3|3000| 0| 20|
| 7934|MILLER| CLERK|7782| 1982/1/23|1300| 0| 10|
+-----+------+---------+----+----------+----+----+------+
// Dept表:
+------+----------+--------+
|deptno| dname| loc|
+------+----------+--------+
| 10|ACCOUNTING|NEW YORK|
| 20| RESEARCH| DALLAS|
| 30| SALES| CHICAGO|
| 40|OPERATIONS| BOSTON|
+------+----------+--------+
scala> val result = deptDS.join(empDS,"deptno")
result: org.apache.spark.sql.DataFrame = [deptno: int, dname: string ... 8 more fields]
scala> result.show
+------+----------+--------+-----+------+---------+----+----------+----+----+
|deptno| dname| loc|empno| ename| job| mgr| hiredate| sal|comm|
+------+----------+--------+-----+------+---------+----+----------+----+----+
| 20| RESEARCH| DALLAS| 7369| SMITH| CLERK|7902|1980/12/17| 800| 0|
| 20| RESEARCH| DALLAS| 7566| JONES| MANAGER|7839| 1981/4/2|2975| 0|
| 20| RESEARCH| DALLAS| 7788| SCOTT| ANALYST|7566| 1987/4/19|3000| 0|
| 20| RESEARCH| DALLAS| 7876| ADAMS| CLERK|7788| 1987/5/23|1100| 0|
| 20| RESEARCH| DALLAS| 7902| FORD| ANALYST|7566| 1981/12/3|3000| 0|
| 10|ACCOUNTING|NEW YORK| 7782| CLARK| MANAGER|7839| 1981/6/9|2450| 0|
| 10|ACCOUNTING|NEW YORK| 7839| KING|PRESIDENT|7839|1981/11/17|5000| 0|
| 10|ACCOUNTING|NEW YORK| 7934|MILLER| CLERK|7782| 1982/1/23|1300| 0|
| 30| SALES| CHICAGO| 7499| ALLEN| SALESMAN|7698| 1981/2/20|1600| 300|
| 30| SALES| CHICAGO| 7521| WARD| SALESMAN|7698| 1981/2/22|1250| 500|
| 30| SALES| CHICAGO| 7654|MARTIN| SALESMAN|7698| 1981/9/28|1250|1400|
| 30| SALES| CHICAGO| 7698| BLAKE| MANAGER|7839| 1981/5/1|2850| 0|
| 30| SALES| CHICAGO| 7844|TURNER| SALESMAN|7698| 1981/9/8|1500| 0|
| 30| SALES| CHICAGO| 7900| JAMES| CLERK|7698| 1981/12/3| 950| 0|
+------+----------+--------+-----+------+---------+----+----------+----+----+
// deptDS和empDS关联的字段(这里两个表的字段名不相同)
scala> val result = deptDS.joinWith(empDS,deptDS("deptno")===empDS("deptno"))
result: org.apache.spark.sql.Dataset[(Dept, Emp)] = [_1: struct<deptno: int, dname: string ... 1 more field>, _2: struct<empno: int, ename: string ... 6 more fields>]
scala> result.show
+--------------------+--------------------+
| _1| _2|
+--------------------+--------------------+
|[20,RESEARCH,DALLAS]|[7369,SMITH,CLERK...|
|[20,RESEARCH,DALLAS]|[7566,JONES,MANAG...|
|[20,RESEARCH,DALLAS]|[7788,SCOTT,ANALY...|
|[20,RESEARCH,DALLAS]|[7876,ADAMS,CLERK...|
|[20,RESEARCH,DALLAS]|[7902,FORD,ANALYS...|
|[10,ACCOUNTING,NE...|[7782,CLARK,MANAG...|
|[10,ACCOUNTING,NE...|[7839,KING,PRESID...|
|[10,ACCOUNTING,NE...|[7934,MILLER,CLER...|
| [30,SALES,CHICAGO]|[7499,ALLEN,SALES...|
| [30,SALES,CHICAGO]|[7521,WARD,SALESM...|
| [30,SALES,CHICAGO]|[7654,MARTIN,SALE...|
| [30,SALES,CHICAGO]|[7698,BLAKE,MANAG...|
| [30,SALES,CHICAGO]|[7844,TURNER,SALE...|
| [30,SALES,CHICAGO]|[7900,JAMES,CLERK...|
+--------------------+--------------------+
scala> result.printSchema
root
|-- _1: struct (nullable = false)
| |-- deptno: integer (nullable = true)
| |-- dname: string (nullable = true)
| |-- loc: string (nullable = true)
|-- _2: struct (nullable = false)
| |-- empno: integer (nullable = true)
| |-- ename: string (nullable = true)
| |-- job: string (nullable = true)
| |-- mgr: integer (nullable = true)
| |-- hiredate: string (nullable = true)
| |-- sal: integer (nullable = true)
| |-- comm: integer (nullable = true)
| |-- deptno: integer (nullable = true)
scala> result.explain
joinWith和join的区别是连接后的新Dataset的Schema会不一样