当前位置: 首页>数据库>正文

Spark SQL(三)—— Dataset


文章目录

  • 1. 创建Dataset
  • 1.1 方式一:使用序列
  • 1.2 方式二:使用JSON数据,将DataFrame转换成Dataset
  • 1.3 方式三:使用其他数据
  • 2. 操作Dataset
  • 2.1 基本操作
  • 2.2 多表关联


Dataset 是 Spark 1.6 之后,对 DataFrame做的一个封装。为了解决DataFrame缺乏编译时类型安全这个问题。可以把Dataset理解成高级的DataFrame。

1. 创建Dataset

1.1 方式一:使用序列

scala> case class MyData(a:Int,b:String)
defined class MyData

scala> val ds = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS
ds: org.apache.spark.sql.Dataset[MyData] = [a: int, b: string]

scala> ds.show
+---+----+
|  a|   b|
+---+----+
|  1| Tom|
|  2|Mary|
+---+----+

1.2 方式二:使用JSON数据,将DataFrame转换成Dataset

scala> case class Person(name:String,age:BigInt)
defined class Person

scala> val df = spark.read.format("json").load("/opt/top_resources/test/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

// DataFrame转Dataset
scala> df.as[Person]
res12: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]

scala> df.as[Person].show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

1.3 方式三:使用其他数据

Dataset可以理解为RDD操作和DataFrame操作结合

// 本地
scala> val linesDS=spark.read.text("/opt/top_resources/test/test_WordCount.txt").as[String]
linesDS: org.apache.spark.sql.Dataset[String] = [value: string]
// HDFS数据
scala> val linesDS = spark.read.text("hdfs://hadoop111:9000/test/test_WordCount.txt").as[String]

scala> val words = linesDS.flatMap(_.split(" ")).filter(_.length>3)
words: org.apache.spark.sql.Dataset[String] = [value: string]

scala> words.show
+----------+
|     value|
+----------+
|      love|
|   Beijing|
|      love|
|     China|
|   Beijing|
|   capital|
|     China|
|hehehehehe|
+----------+

scala> val result = linesDS.flatMap(_.split(" ")).map((_,1)).groupByKey(x=>x._1).count
result: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]
等价于==》var a = linesDS.flatMap(_.split(" ")).groupBy("value").count.show
等价于==》var a = linesDS.flatMap(_.split(" ")).groupBy($"value").count.show

scala> result.show
+----------+--------+                                                           
|     value|count(1)|
+----------+--------+
|hehehehehe|       1|
|   Beijing|       2|
|      love|       2|
|     China|       2|
|        is|       1|
|   capital|       1|
|       the|       1|
|        of|       1|
|         I|       2|
+----------+--------+

scala> result.orderBy($"value").show
等价于==》scala> var a = word.groupBy($"value").count.orderBy("value").show
+----------+--------+                                                           
|     value|count(1)|
+----------+--------+
|   Beijing|       2|
|     China|       2|
|         I|       2|
|   capital|       1|
|hehehehehe|       1|
|        is|       1|
|      love|       2|
|        of|       1|
|       the|       1|
+----------+--------+

scala> result.orderBy($"count(1)").show
等价于==》scala> var a = word.groupBy($"value").count.orderBy("count").show
+----------+--------+                                                           
|     value|count(1)|
+----------+--------+
|        is|       1|
|   capital|       1|
|        of|       1|
|       the|       1|
|hehehehehe|       1|
|   Beijing|       2|
|     China|       2|
|      love|       2|
|         I|       2|
+----------+--------+

2. 操作Dataset

2.1 基本操作

scala> val empDS = df1.as[Emp]
empDS: org.apache.spark.sql.Dataset[Emp] = [empno: int, ename: string ... 6 more fields]

scala> empDS.filter(_.sal > 3000).show
+-----+-----+---------+----+----------+----+----+------+
|empno|ename|      job| mgr|  hiredate| sal|comm|deptno|
+-----+-----+---------+----+----------+----+----+------+
| 7839| KING|PRESIDENT|7839|1981/11/17|5000|   0|    10|
+-----+-----+---------+----+----------+----+----+------+

scala> empDS.filter(_.deptno==10).show
+-----+------+---------+----+----------+----+----+------+
|empno| ename|      job| mgr|  hiredate| sal|comm|deptno|
+-----+------+---------+----+----------+----+----+------+
| 7782| CLARK|  MANAGER|7839|  1981/6/9|2450|   0|    10|
| 7839|  KING|PRESIDENT|7839|1981/11/17|5000|   0|    10|
| 7934|MILLER|    CLERK|7782| 1982/1/23|1300|   0|    10|
+-----+------+---------+----+----------+----+----+------+

2.2 多表关联

scala> case class Dept(deptno:Int,dname:String,loc:String)
scala> var lines = sc.textFile("/opt/top_resources/test/dept.csv").map(_.split(","))
scala> var allDept = lines.map(x=>Dept(x(0).toInt,x(1),x(2)))   // 返回的是一个RDD
scala> val deptDS = allDept.toDS    // RDD转DS
deptDS: org.apache.spark.sql.Dataset[Dept] = [deptno: int, dname: string ... 1 more field]

// deptDS和empDS关联的字段(这里两个表的字段名相同)
// Emp表:
+-----+------+---------+----+----------+----+----+------+
|empno| ename|      job| mgr|  hiredate| sal|comm|deptno|
+-----+------+---------+----+----------+----+----+------+
| 7369| SMITH|    CLERK|7902|1980/12/17| 800|   0|    20|
| 7499| ALLEN| SALESMAN|7698| 1981/2/20|1600| 300|    30|
| 7521|  WARD| SALESMAN|7698| 1981/2/22|1250| 500|    30|
| 7566| JONES|  MANAGER|7839|  1981/4/2|2975|   0|    20|
| 7654|MARTIN| SALESMAN|7698| 1981/9/28|1250|1400|    30|
| 7698| BLAKE|  MANAGER|7839|  1981/5/1|2850|   0|    30|
| 7782| CLARK|  MANAGER|7839|  1981/6/9|2450|   0|    10|
| 7788| SCOTT|  ANALYST|7566| 1987/4/19|3000|   0|    20|
| 7839|  KING|PRESIDENT|7839|1981/11/17|5000|   0|    10|
| 7844|TURNER| SALESMAN|7698|  1981/9/8|1500|   0|    30|
| 7876| ADAMS|    CLERK|7788| 1987/5/23|1100|   0|    20|
| 7900| JAMES|    CLERK|7698| 1981/12/3| 950|   0|    30|
| 7902|  FORD|  ANALYST|7566| 1981/12/3|3000|   0|    20|
| 7934|MILLER|    CLERK|7782| 1982/1/23|1300|   0|    10|
+-----+------+---------+----+----------+----+----+------+ 

// Dept表:
+------+----------+--------+                                                    
|deptno|     dname|     loc|
+------+----------+--------+
|    10|ACCOUNTING|NEW YORK|
|    20|  RESEARCH|  DALLAS|
|    30|     SALES| CHICAGO|
|    40|OPERATIONS|  BOSTON|
+------+----------+--------+

scala> val result = deptDS.join(empDS,"deptno")
result: org.apache.spark.sql.DataFrame = [deptno: int, dname: string ... 8 more fields]

scala> result.show
+------+----------+--------+-----+------+---------+----+----------+----+----+   
|deptno|     dname|     loc|empno| ename|      job| mgr|  hiredate| sal|comm|
+------+----------+--------+-----+------+---------+----+----------+----+----+
|    20|  RESEARCH|  DALLAS| 7369| SMITH|    CLERK|7902|1980/12/17| 800|   0|
|    20|  RESEARCH|  DALLAS| 7566| JONES|  MANAGER|7839|  1981/4/2|2975|   0|
|    20|  RESEARCH|  DALLAS| 7788| SCOTT|  ANALYST|7566| 1987/4/19|3000|   0|
|    20|  RESEARCH|  DALLAS| 7876| ADAMS|    CLERK|7788| 1987/5/23|1100|   0|
|    20|  RESEARCH|  DALLAS| 7902|  FORD|  ANALYST|7566| 1981/12/3|3000|   0|
|    10|ACCOUNTING|NEW YORK| 7782| CLARK|  MANAGER|7839|  1981/6/9|2450|   0|
|    10|ACCOUNTING|NEW YORK| 7839|  KING|PRESIDENT|7839|1981/11/17|5000|   0|
|    10|ACCOUNTING|NEW YORK| 7934|MILLER|    CLERK|7782| 1982/1/23|1300|   0|
|    30|     SALES| CHICAGO| 7499| ALLEN| SALESMAN|7698| 1981/2/20|1600| 300|
|    30|     SALES| CHICAGO| 7521|  WARD| SALESMAN|7698| 1981/2/22|1250| 500|
|    30|     SALES| CHICAGO| 7654|MARTIN| SALESMAN|7698| 1981/9/28|1250|1400|
|    30|     SALES| CHICAGO| 7698| BLAKE|  MANAGER|7839|  1981/5/1|2850|   0|
|    30|     SALES| CHICAGO| 7844|TURNER| SALESMAN|7698|  1981/9/8|1500|   0|
|    30|     SALES| CHICAGO| 7900| JAMES|    CLERK|7698| 1981/12/3| 950|   0|
+------+----------+--------+-----+------+---------+----+----------+----+----+

// deptDS和empDS关联的字段(这里两个表的字段名不相同)
scala> val result = deptDS.joinWith(empDS,deptDS("deptno")===empDS("deptno"))
result: org.apache.spark.sql.Dataset[(Dept, Emp)] = [_1: struct<deptno: int, dname: string ... 1 more field>, _2: struct<empno: int, ename: string ... 6 more fields>]

scala> result.show
+--------------------+--------------------+                                     
|                  _1|                  _2|
+--------------------+--------------------+
|[20,RESEARCH,DALLAS]|[7369,SMITH,CLERK...|
|[20,RESEARCH,DALLAS]|[7566,JONES,MANAG...|
|[20,RESEARCH,DALLAS]|[7788,SCOTT,ANALY...|
|[20,RESEARCH,DALLAS]|[7876,ADAMS,CLERK...|
|[20,RESEARCH,DALLAS]|[7902,FORD,ANALYS...|
|[10,ACCOUNTING,NE...|[7782,CLARK,MANAG...|
|[10,ACCOUNTING,NE...|[7839,KING,PRESID...|
|[10,ACCOUNTING,NE...|[7934,MILLER,CLER...|
|  [30,SALES,CHICAGO]|[7499,ALLEN,SALES...|
|  [30,SALES,CHICAGO]|[7521,WARD,SALESM...|
|  [30,SALES,CHICAGO]|[7654,MARTIN,SALE...|
|  [30,SALES,CHICAGO]|[7698,BLAKE,MANAG...|
|  [30,SALES,CHICAGO]|[7844,TURNER,SALE...|
|  [30,SALES,CHICAGO]|[7900,JAMES,CLERK...|
+--------------------+--------------------+

scala> result.printSchema
root
 |-- _1: struct (nullable = false)
 |    |-- deptno: integer (nullable = true)
 |    |-- dname: string (nullable = true)
 |    |-- loc: string (nullable = true)
 |-- _2: struct (nullable = false)
 |    |-- empno: integer (nullable = true)
 |    |-- ename: string (nullable = true)
 |    |-- job: string (nullable = true)
 |    |-- mgr: integer (nullable = true)
 |    |-- hiredate: string (nullable = true)
 |    |-- sal: integer (nullable = true)
 |    |-- comm: integer (nullable = true)
 |    |-- deptno: integer (nullable = true)
                       
 scala> result.explain

joinWith和join的区别是连接后的新Dataset的Schema会不一样



https://www.xamrdz.com/database/6x21926436.html

相关文章: