Spark 2.x管理与开发-Spark SQL-【Spark SQL案例】(一)单表查询*
1.读取并打印指定文件的所有数据
Scala代码:
package sqlExamples
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Logger
import org.apache.log4j.Level
import java.text.SimpleDateFormat
//注意:在Spark和Hive里面读取进来的原数据尽量用String,这样可以避免很多不必要的问题
//如果后期需要进行运算,再将其进行类型的转换即可
/**
* sno:学号
* sname:姓名
* ssex:性别
* sbirthday:生日
* sclass:班级
*/
case class Students(sno: String, sname: String, ssex: String, sbirthday: String, sclass: String)
/**
* cno:课程号
* cname:课程名
* tno:教工编号
*/
case class Course(cno: String, cname: String, tno: String)
/**
* sno:学号
* cno:课程号
* degree:成绩
*/
case class Score(sno: String, cno: String, degree: String)
/**
* tprof:职称
* tdepart:教工所在部门
*/
case class Teacher(tno: String, tname: String, tsex: String, tbirthday: String, tprof: String, tdepart: String)
object StudentsAndTeachers {
def main(args: Array[String]): Unit = {
//下面的两行代码定义日志级别,可以减少打印出来的日志
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
//Spark环境
val spark = SparkSession.builder().master("local").appName("SparkSQLExample").getOrCreate()
import spark.sqlContext.implicits._
//读取数据
spark.sparkContext.textFile("D:\tmp_files\spark_sql_test_data\Student.csv")
.map(_.split(",")) //注意:csv格式的文件最好用逗号进行切割
.map(x => Students(x(0), x(1), x(2), x(3), x(4))) //采用case class 创建DataFrame 并关联表结构
.toDF //将RDD转换成DataFrame
.createOrReplaceTempView("student") //创建视图
spark.sparkContext.textFile("D:\tmp_files\spark_sql_test_data\Course.csv")
.map(_.split(","))
.map(x => Course(x(0), x(1), x(2)))
.toDF
.createOrReplaceTempView("course")
spark.sparkContext.textFile("D:\tmp_files\spark_sql_test_data\Score.csv")
.map(_.split(","))
.map(x => Score(x(0), x(1), x(2)))
.toDF
.createOrReplaceTempView("Score")
spark.sparkContext.textFile("D:\tmp_files\spark_sql_test_data\Teacher.csv")
.map(_.split(","))
.map(x => Teacher(x(0), x(1), x(2), x(3), x(4), x(5)))
.toDF
.createOrReplaceTempView("Teacher")
//*****************单表查询*********************
//一、打印出来
spark.sql("select * from student").show()
spark.sql("select * from course").show()
spark.sql("select * from Score").show() //可以看出,Spark大小写不敏感
spark.sql("select * from teacher").show() //可以看出,Spark大小写不敏感//关闭Spark
spark.close()
}
}
结果:
(1)Student.csv
(2)Course.csv
(3)Score.csv
(4)Teacher.csv
提出问题:
在运行时打印出的日志中有发现关于hadoop的记录信息:
2020-07-29 17:35:24,397 INFO [org.apache.hadoop.mapred.FileInputFormat] - Total input paths to process : 1 2020-07-29 17:35:24,637 INFO [org.apache.hadoop.conf.Configuration.deprecation] - mapred.tip.id is deprecated. Instead, use mapreduce.task.id 2020-07-29 17:35:24,637 INFO [org.apache.hadoop.conf.Configuration.deprecation] - mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 2020-07-29 17:35:24,637 INFO [org.apache.hadoop.conf.Configuration.deprecation] - mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 2020-07-29 17:35:24,637 INFO [org.apache.hadoop.conf.Configuration.deprecation] - mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 2020-07-29 17:35:24,637 INFO [org.apache.hadoop.conf.Configuration.deprecation] - mapred.job.id is deprecated. Instead, use mapreduce.job.id +---+----------+------+---------+------+ 2020-08-01 15:59:52,695 INFO [org.apache.hadoop.mapred.FileInputFormat] - Total input paths to process : 1 +-----+--------------------+---+ 2020-08-01 15:59:52,695 INFO [org.apache.hadoop.mapred.FileInputFormat] - Total input paths to process : 1 +---+-----+------+ 2020-08-01 15:59:52,695 INFO [org.apache.hadoop.mapred.FileInputFormat] - Total input paths to process : 1 +---+---------+------+---------+-------------------+--------------------+ |
而此时的程序只是在利用Spark将读取到的Windows本地主机上指定目录下的文件数据打印出来
并没有涉及到和Hadoop相关的东西,所以对日志中hadoop的出现感到疑惑。
解释:
这是因为我在IDEA里面有关于Hadoop的配置文件
2.选取表中指定的列进行读取、去重读取
Scala代码:
//二、查询指定列、去重取列
//1.查询Student表所有的sname ssex sclass列:
spark.sql("select sname,ssex,sclass from student").show()
//2.查询教师表中不重复的tdepart列-distinct(但实际工作中,有时distinct效果不是很好):
spark.sql("select distinct tdepart from teacher").show() //结果不会被完全展开,会有"..."
spark.sql("select distinct tdepart from teacher").show(false) //false-->查询结果会被完全展开
//3.查询教师表中不重复的tdepart列-group by:
//注意:指定 GROUP BY 时,选择列表中任一非聚合表达式内的所有列都应包含在 GROUP BY 列表中,
// 或者 GROUP BY 表达式必须与选择列表表达式完全匹配。
spark.sql("select tdepart from teacher group by tdepart").show(false)
结果:
3.有条件查询
Scala代码:
//三、有条件查询
//1.查询Score表中成绩在60-80之间的所有记录- 两种写法(生产中并无太大区别)
spark.sql("select * from score where degree >= 60 and degree <= 80").show()
spark.sql("select * from score where degree between 60 and 80").show()
//2.查询Score表中成绩为85,86或88的记录
spark.sql("select * from score where degree='85' or degree='86' OR degree='88'").show() //SQL语句中大小写不敏感
spark.sql("select * from score where degree=85 or degree=86 OR degree=88").show() //加不加单引号都一样
结果:
4. 排序查询
Scala代码:
//四、排序查询
//1.以Class降序排序查询--order by (desc)
spark.sql("select * from student order by sclass desc").show()
//2.以Class降序升序查询--order by (asc)
spark.sql("select * from student order by sclass").show()
//3.总体以sno升序,且相同sno的degree降序查询Score表的数据
spark.sql("select * from score t order by t.sno asc,t.degree desc").show()
结果:
5.类型转换
Scala代码:
//五、类型转换
//查询Score表中的是最高分的学生所有信息--limit 1
spark.sql("select * from score order by degree desc limit 1").show()
//注意:这里不能直接将学生的分数进行排序,因为是String类型的数据排序的话只是按照ASCII进行排序的,
//比如:将原来表格中的81分修改成181,结果会是181<86
//需要将其转换成Int类型的才可以-Int()
spark.sql("select * from score order by Int(degree) desc limit 1").show()
结果:
6.求平均值
Scala代码:
//六、平均值
//求每门课的平均值--avg()
spark.sql("select cno,avg(degree) from score group by cno").show()
结果:
7.复杂的单表查询
Scala代码:
//*****************复杂的单表查询*********************
//七、复杂的单表查询
//1.查询Score表中,至少有5名学生进修,并且名字以3开头课程的平均分--like、group by 、having 、count()
spark.sql("select cno,avg(degree) from score where cno like'3%' group by cno having count(1) >=5").show()
//2.查询和学号为108的同学同年出生的sno,sname,sbirthday --切割字符串subString()
spark.sql("select sno,sname,sbirthday from Student where substring(sbirthday,0,4)=(" +
"select substring(t.sbirthday,0,4) from student t where sno='108')").show()
//3.查询各课程成绩低于平均分的同学的成绩表
spark.sql("select s.* from score s where s.degree < (select avg(degree) from score c where c.cno=s.cno)").show()
//4.查询至少有两名男生的班号-group by + having
spark.sql("select sclass from Student where ssex='male' group by sclass having count(ssex)>=2").show()
//5.Student表中不姓王的学生的记录--not like()
spark.sql("select * from Student where sname not like('Wang%')").show()
//6.查询出Student表中每个学生的姓名和年龄
//先定义一个函数--getDate(),用来获取当前的时间
//Spark-SQL计算的时候String类型的数据会被自动地转换成Double类型的数据
//可用cast (... as int) 来将其它类型的数据转换成Int类型的数据
spark.sql("select sname,(cast(" + getDate("yyyy") + " as int) -cast( substring(sbirthday,0,4) as int))as age from Student").show()
/**
* 定义的用来获取当前时间的函数
*/
def getDate(time: String) = {
val now: Long = System.currentTimeMillis()
val df: SimpleDateFormat = new SimpleDateFormat(time)
df.format(now)
}
结果: