目录
测试用source
JDBCsource
读取 Kafka数据
常见的Source可以分为大概4类:
1、基于本地集合的 source
2、基于文件的 source
3、基于网络套接字的 source
4、自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,当然你也可以定义自己的 source
首先了解一下常见的测试source:也就是上面的前3个
测试用source
import org.apache.flink.streaming.api.scala._
object FileSource {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度为1可以方便观看结果,可以很清楚的看见处理的结果是每一条数据计算一次,本地默认并行度和cpu数量相等
//env.setParallelism(1)
/**
* 基于集合创建有界流
*/
val listDS: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4, 5, 6))
/**
*基于文件创建有界流
*/
val flieDS = env.readTextFile("data\students.txt")
val clazzDS = flieDS.map(_.split(",")(4))
.map((_, 1))
.keyBy(_._1)
.reduce((x, y) => (x._1, x._2 + y._2))
/**
* 基于scoket的无界流
*/
val lineDS: DataStream[String] = env.socketTextStream("doker",8888)
val countDS = lineDS.map(_.split(","))
.map((_, 1))
.keyBy(_._1)
.reduce((x, y) => (x._1, x._2 + y._2))
countDS.print()
clazzDS.print()
listDS.print()
env.execute()
}
}
JDBCsource
自定义source:官方给出的例子中没有mysql,hbase,redis
我们就可以自己实现一下这些连接
首先看这里,点进去
最后返回了一个addSource的对象,在addSource方法种我们需要传入一个SourceFunction类的对象,点进去
他里面主要是2个方法,run方法生成数据,cancel是在任务取消的时候回收资源
mysql依赖
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.49</version> </dependency>
所以我们自己写source,就需要使用到addSource方法,里面传入SourceFunction
继承run方法和cancel方法
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
object MySourceFunction {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val clazzDS: DataStream[(String, Int)] = env.addSource(new MysqlSourceFunction)
clazzDS.print()
env.execute()
}
}
class MysqlSourceFunction extends SourceFunction[(String,Int)]{
override def run(ctx: SourceFunction.SourceContext[(String,Int)]): Unit = {
//1、加载驱动
Class.forName("com.mysql.jdbc.Driver")
//2、创建连接
val con: Connection = DriverManager.getConnection(
"jdbc:mysql://doker:3306/stu?useUnicode=true&characterEncoding=utf-8",
"root",
"123456")
//3、查询数据
val stat: PreparedStatement = con.prepareStatement("select * from student")
//4、执行查询
val resultSet = stat.executeQuery()
//5、解析数据
while(resultSet.next()){
val clazz: String = resultSet.getString("clazz")
val age: Int = resultSet.getInt("age")
ctx.collect((clazz,age))
}
//关闭连接
con.close()
}
override def cancel(): Unit = {
}
}
部分结果如下:
上面使用的是SourceFunction,还有一种规范的RichSourceFuction
继承run方法和cancel方法
重写open和close方法,把创建连接的代码放在里面,方便管理,看起来结构更加美观
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
object MyRichSourceFunction {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val clazzDS= env.addSource(new MysqlRichSourceFunction)
clazzDS.print()
env.execute()
}
}
class MysqlRichSourceFunction extends RichSourceFunction[(String,Int)]{
var con:Connection=_
/**
* 在run方法之前执行
*/
override def open(parameters: Configuration): Unit = {
//1、加载驱动
Class.forName("com.mysql.jdbc.Driver")
//2、创建连接
con = DriverManager.getConnection(
"jdbc:mysql://doker:3306/stu?useUnicode=true&characterEncoding=utf-8",
"root",
"123456")
}
/**
* 在run方法之后执行
*/
override def close(): Unit = {
con.close()
}
override def run(ctx: SourceFunction.SourceContext[(String, Int)]): Unit = {
//3、查询数据
val stat: PreparedStatement = con.prepareStatement("select * from student")
//4、执行查询
val resultSet = stat.executeQuery()
//5、解析数据
while(resultSet.next()){
val clazz: String = resultSet.getString("clazz")
val age: Int = resultSet.getInt("age")
ctx.collect((clazz,age))
}
//关闭连接
con.close()
}
override def cancel(): Unit = {
}
}
RichParallelSourceFunction和ParallelSourceFunction代码差不多,都是多线程,local模式并行度等于cpu数量;
也可以设置并行度控制并行度的数量
env.setParallelism(2)
读取数据,存在重复读取的情况
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
object MyRichParallelSourceFunction {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val clazzDS= env.addSource(new MysqlRichSourceFunction)
clazzDS.print()
env.execute()
}
}
class MysqlParallelRichSourceFunction extends RichParallelSourceFunction[(String,Int)]{
var con:Connection=_
/**
* 在run方法之前执行
*/
override def open(parameters: Configuration): Unit = {
//1、加载驱动
Class.forName("com.mysql.jdbc.Driver")
//2、创建连接
con = DriverManager.getConnection(
"jdbc:mysql://doker:3306/stu?useUnicode=true&characterEncoding=utf-8",
"root",
"123456")
}
/**
* 在run方法之后执行
*/
override def close(): Unit = {
con.close()
}
override def run(ctx: SourceFunction.SourceContext[(String, Int)]): Unit = {
//3、查询数据
val stat: PreparedStatement = con.prepareStatement("select * from student")
//4、执行查询
val resultSet = stat.executeQuery()
//5、解析数据
while(resultSet.next()){
val clazz: String = resultSet.getString("clazz")
val age: Int = resultSet.getInt("age")
ctx.collect((clazz,age))
}
//关闭连接
con.close()
}
override def cancel(): Unit = {
}
}
这里发现出现了重复的数据,因为他开启了多个进程读取数据
SourceFunction: 单并行度的source< run方法只执行一次
RichSourceFunction: 多了open和close方法
ParallelSourceFunction: 并行的source, 每一个并行度中run方法都会执行一次
RichParallelSourceFunction: 多了open和close方法
读取 Kafka数据
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
object KafkaSourceTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "doker:9092")
properties.setProperty("group.id", "test1")
val myConsumer = new FlinkKafkaConsumer[String]("test_topic1", new SimpleStringSchema(), properties)
myConsumer.setStartFromEarliest() // 尽可能从最早的记录开始
// myConsumer.setStartFromLatest() // 从最新的记录开始
// myConsumer.setStartFromTimestamp(...) // 从指定的时间开始(毫秒)
// myConsumer.setStartFromGroupOffsets()
env.addSource(myConsumer)
.print()
env.execute()
}
}
成功读取数据: