Spark DataFrame入门学习笔记
文章目录
- Spark DataFrame入门学习笔记
- 1. 添加配置文件
- 1.1、 配置文件存放目录
- 1.2、 添加读取配置代码
- 2. 初始化Spark
- 3. 读入数据
- 3.1、 本地文件导入
- 2.2 从Hive数据库中读取
- 2.3 从关系型数据库中读取(eg: Mysql)
- 4. 数据倾斜后的散列操作
- 4.1 添加随机数散列到不同节点
- 5. 数据分批次处理
1. 添加配置文件
1.1、 配置文件存放目录
resources
目录下,以 scala.properties 文件为例
1.2、 添加读取配置代码
import java.util.Properties
object LoadProperties {
def get_instance: Properties = {
val properties = new Properties()
val in = Thread.currentThread().getContextClassLoader.
getResourceAsStream("scala.properties")
properties.load(in)
properties
}
}
2. 初始化Spark
// 初始化配置文件加载器
val pros: Properties = LoadProperties.get_instance
// 判断是否本地运行
val isLocal: Boolean = pros.get("is_load_local_data").toString == "true"
val conf: SparkConf =
if (isLocal)
// 设置本地运行时 SparkConf 与 modelPath
new SparkConf().setAppName(getClass.getSimpleName).setMaster("local[2]")
else
// 设置线上运行时 SparkConf 与 modelPath
new SparkConf().setAppName(getClass.getSimpleName)
// 设置 SparkContext
val sc = new SparkContext(conf)
// 设置 HiveContext
val spark = new HiveContext(sc)
spark.setConf("spark.sql.shuffle.partitions", "3000")
import spark.implicits._
注意:
此处应该格外注意,对spark相关上下文初始化的过程中,应该放在main方法中,而不是放在静态变量加载区。由于Spark的运行是分布式的运行,静态变量(包括spark相关上下文及自定义对象的创建)在一个节点加载后,在其他节点运行时,静态变量及静态方法不会重新加载,从而导致该类初始化失败。
- SparkConf:Spark配置类,配置已键值对形式存储,封装了一个ConcurrentHashMap类实例settings用于存储Spark的配置信息。负责管理所有的Spark配置项。
- SparkContext:Spark的主要入口点,Spark的灵魂。
- HiveContext:Spark访问Hive表(或本地文件)的引擎
3. 读入数据
3.1、 本地文件导入
case class Data(phone: String, app_code: String, count: Double, master_flag: String, gender: String)
val dpi_path: Any = pros.get("dpi.data.path") //可通过配置文件配置
data = = sc.textFile("" + dpi_path)
.map(_.split("\t"))
.map(x => Data(x(0), x(1), x(2).toDouble, x(3), x(4)))
.toDF()
2.2 从Hive数据库中读取
val tableName1 = pros.get("tableName1").toString
val tableName2 = pros.get("tableName2").toString
val beginDay = pros.get("beginDay").toString
val endDay = pros.get("endDay").toString
val dpiSql: String =
s" select a.phone phone, a.app_code app_code, a.count count," + s" b.master_flag master_flag " +
s" from " +
s" (select phone phone, appcode app_code, sum(usetimes) count" +
s" from $tableName1 where day>=$beginDay and day <= $endDay" +
s" group by phone, appcode) as a" +
s" inner join " +
s" (select phone, master_flag from $tableName2 where year > 2018)"
s" as b " +
s" on a.phone=b.phone"
dpiData = spark.sql(dpiSql)
2.3 从关系型数据库中读取(eg: Mysql)
def main(args: Array[String]): Unit = {
val pros = LoadProperties.get_instance
val conf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName).setMaster("local[2]")
// 设置 SparkContext
val sc = new SparkContext(conf)
// 设置 HiveContext
val spark = new HiveContext(sc)
val jdbcDF = spark.read.format("jdbc")
.option("url", "jdbc:mysql://" + pros.get("mysql.url").toString)
.option("driver", pros.getProperty("mysql.driver"))
.option("user", pros.getProperty("mysql.user"))
.option("password", pros.getProperty("mysql.password"))
.option("dbtable", pros.getProperty("mysql.dbtable"))
.load()
jdbcDF.show()
}
HiveContext 默认读入Mysql表中所有数据,若需要添加过滤条件,则需要通过 sql 语句构造中间表。修改 jdbcDF 如下:
val tableName = pros.getProperty("mysql.dbtable")
val table = s"(select id, username, email from $tableName where id < 3 ) as t1"
val jdbcDF = spark.read.format("jdbc")
.option("url", "jdbc:mysql://" + pros.get("mysql.url").toString)
.option("driver", pros.getProperty("mysql.driver"))
.option("user", pros.getProperty("mysql.user"))
.option("password", pros.getProperty("mysql.password"))
.option("dbtable", table)
.load()
4. 数据倾斜后的散列操作
4.1 添加随机数散列到不同节点
- 环境:40个节点
- 性别:男,女
问题描述:若不做散列,则容易出现数据倾斜到两个节点,导致两个节点崩掉
解决方案:将数据散列到不同节点
// 稀疏特征
def keyWithRandom(string: String): String = {
string + Random.nextInt(200)
}
val w = Window.partitionBy("gender").orderBy(rand())
// 注册UDF函数
val udfKeyAddRandomValue = spark.udf.register("keyWithRandom", keyWithRandom _)
var tmpData = featureData.withColumn("gender", udfKeyAddRandomValue($"gender"))
.withColumn("rk", row_number().over(w))
.filter($"rk" <= 5000)
.withColumn("gender", substring($"gender", 0, 1))
.cache()
代码含义:
- udfKeyAddRandomValue : 将gender列特征的值加上一个随机数
- row_number.over(w) : over根据gender排序进行累计,row_number将产生一个新的编号,所以该代码含义是将gender对不同的partition(partition是性别)进行排序后对不同的partition添加新的编号
- w :将render列排序。orderBy(rand())的作用是排序(每次排序的结果都不一样)【Sql】
- filter :过滤rk(gender编号)小于5000的值,防止某个节点过载。
row_number.over(w) 函数功能示意图:
5. 数据分批次处理
spark.udf.register("hash_phone",(phone:String)=>phone.hashCode % 20)
hash = 10
val tableName1 = pros.get("tableName1").toString
val tableName2 = pros.get("tableName2").toString
val beginDay = pros.get("beginDay").toString
val endDay = pros.get("endDay").toString
val dpiSql: String =
s" select a.phone phone, a.app_code app_code, a.count count," + s" b.master_flag master_flag " +
s" from " +
s" (select phone phone, appcode app_code, sum(usetimes) count" +
s" from $tableName1" +
s" where hash_phone(phoneno) = $hash" +
s" group by phone, appcode) as a" +
s" inner join " +
s" (select phone, master_flag from $tableName2 where year > 2018)"
s" as b " +
s" on a.phone=b.phone"
dpiData = spark.sql(dpiSql)