当前位置: 首页>后端>正文

flink消费protobuf flink消费多个source

目录

测试用source

JDBCsource

读取 Kafka数据


常见的Source可以分为大概4类:

1、基于本地集合的 source

2、基于文件的 source

3、基于网络套接字的 source

4、自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,当然你也可以定义自己的 source

首先了解一下常见的测试source:也就是上面的前3个

测试用source

import org.apache.flink.streaming.api.scala._
object FileSource {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //设置并行度为1可以方便观看结果,可以很清楚的看见处理的结果是每一条数据计算一次,本地默认并行度和cpu数量相等
    //env.setParallelism(1)

    /**
     * 基于集合创建有界流
     */
    val listDS: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4, 5, 6))

    /**
     *基于文件创建有界流
     */
    val flieDS = env.readTextFile("data\students.txt")

    val clazzDS = flieDS.map(_.split(",")(4))
      .map((_, 1))
      .keyBy(_._1)
      .reduce((x, y) => (x._1, x._2 + y._2))

    /**
     * 基于scoket的无界流
     */
    val lineDS: DataStream[String] = env.socketTextStream("doker",8888)

    val countDS = lineDS.map(_.split(","))
      .map((_, 1))
      .keyBy(_._1)
      .reduce((x, y) => (x._1, x._2 + y._2))
    
    countDS.print()
    
    clazzDS.print()

    listDS.print()

    env.execute()
  }
}

 

JDBCsource

自定义source:官方给出的例子中没有mysql,hbase,redis

我们就可以自己实现一下这些连接

首先看这里,点进去

flink消费protobuf flink消费多个source,flink消费protobuf flink消费多个source_big data,第1张

最后返回了一个addSource的对象,在addSource方法种我们需要传入一个SourceFunction类的对象,点进去

flink消费protobuf flink消费多个source,flink消费protobuf flink消费多个source_flink_02,第2张

 他里面主要是2个方法,run方法生成数据,cancel是在任务取消的时候回收资源

flink消费protobuf flink消费多个source,flink消费protobuf flink消费多个source_big data_03,第3张

 mysql依赖

<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.49</version> </dependency>

所以我们自己写source,就需要使用到addSource方法,里面传入SourceFunction

继承run方法和cancel方法

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._


object MySourceFunction {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val clazzDS: DataStream[(String, Int)] = env.addSource(new MysqlSourceFunction)

    clazzDS.print()

    env.execute()
  }
}
class MysqlSourceFunction extends SourceFunction[(String,Int)]{
  override def run(ctx: SourceFunction.SourceContext[(String,Int)]): Unit = {
    //1、加载驱动
    Class.forName("com.mysql.jdbc.Driver")

    //2、创建连接
    val con: Connection = DriverManager.getConnection(
      "jdbc:mysql://doker:3306/stu?useUnicode=true&characterEncoding=utf-8",
      "root",
      "123456")

    //3、查询数据
    val stat: PreparedStatement = con.prepareStatement("select * from student")

    //4、执行查询
    val resultSet = stat.executeQuery()

    //5、解析数据
    while(resultSet.next()){
      val clazz: String = resultSet.getString("clazz")

      val age: Int = resultSet.getInt("age")

      ctx.collect((clazz,age))
    }

    //关闭连接
    con.close()

  }

  override def cancel(): Unit = {

  }
}

部分结果如下: 

flink消费protobuf flink消费多个source,flink消费protobuf flink消费多个source_apache_04,第4张

上面使用的是SourceFunction,还有一种规范的RichSourceFuction

继承run方法和cancel方法

重写open和close方法,把创建连接的代码放在里面,方便管理,看起来结构更加美观

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._

object MyRichSourceFunction {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val clazzDS= env.addSource(new MysqlRichSourceFunction)

    clazzDS.print()

    env.execute()
  }
}

class MysqlRichSourceFunction extends RichSourceFunction[(String,Int)]{

  var con:Connection=_

  /**
   * 在run方法之前执行
   */
  override def open(parameters: Configuration): Unit = {
    //1、加载驱动
    Class.forName("com.mysql.jdbc.Driver")

    //2、创建连接
    con = DriverManager.getConnection(
      "jdbc:mysql://doker:3306/stu?useUnicode=true&characterEncoding=utf-8",
      "root",
      "123456")

  }

  /**
   * 在run方法之后执行
   */
  override def close(): Unit = {
    con.close()
  }

  override def run(ctx: SourceFunction.SourceContext[(String, Int)]): Unit = {


    //3、查询数据
    val stat: PreparedStatement = con.prepareStatement("select * from student")

    //4、执行查询
    val resultSet = stat.executeQuery()

    //5、解析数据
    while(resultSet.next()){
      val clazz: String = resultSet.getString("clazz")

      val age: Int = resultSet.getInt("age")

      ctx.collect((clazz,age))
    }

    //关闭连接
    con.close()
  }

  override def cancel(): Unit = {

  }
}

RichParallelSourceFunction和ParallelSourceFunction代码差不多,都是多线程,local模式并行度等于cpu数量;

也可以设置并行度控制并行度的数量


env.setParallelism(2)


读取数据,存在重复读取的情况

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._

object MyRichParallelSourceFunction {
    def main(args: Array[String]): Unit = {
      val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

      val clazzDS= env.addSource(new MysqlRichSourceFunction)

      clazzDS.print()

      env.execute()
    }
  }

  class MysqlParallelRichSourceFunction extends RichParallelSourceFunction[(String,Int)]{

    var con:Connection=_

    /**
     * 在run方法之前执行
     */
    override def open(parameters: Configuration): Unit = {
      //1、加载驱动
      Class.forName("com.mysql.jdbc.Driver")

      //2、创建连接
      con = DriverManager.getConnection(
        "jdbc:mysql://doker:3306/stu?useUnicode=true&characterEncoding=utf-8",
        "root",
        "123456")

    }

    /**
     * 在run方法之后执行
     */
    override def close(): Unit = {
      con.close()
    }

    override def run(ctx: SourceFunction.SourceContext[(String, Int)]): Unit = {


      //3、查询数据
      val stat: PreparedStatement = con.prepareStatement("select * from student")

      //4、执行查询
      val resultSet = stat.executeQuery()

      //5、解析数据
      while(resultSet.next()){
        val clazz: String = resultSet.getString("clazz")

        val age: Int = resultSet.getInt("age")

        ctx.collect((clazz,age))
      }

      //关闭连接
      con.close()
    }

    override def cancel(): Unit = {

    }

}

 这里发现出现了重复的数据,因为他开启了多个进程读取数据

flink消费protobuf flink消费多个source,flink消费protobuf flink消费多个source_scala_05,第5张

SourceFunction: 单并行度的source< run方法只执行一次

RichSourceFunction: 多了open和close方法

ParallelSourceFunction: 并行的source, 每一个并行度中run方法都会执行一次

RichParallelSourceFunction: 多了open和close方法

读取 Kafka数据

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
object KafkaSourceTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "doker:9092")
    properties.setProperty("group.id", "test1")

    val myConsumer = new FlinkKafkaConsumer[String]("test_topic1", new SimpleStringSchema(), properties)
      myConsumer.setStartFromEarliest()      // 尽可能从最早的记录开始
//    myConsumer.setStartFromLatest()        // 从最新的记录开始
//    myConsumer.setStartFromTimestamp(...)  // 从指定的时间开始(毫秒)
//    myConsumer.setStartFromGroupOffsets()

    env.addSource(myConsumer)
      .print()
    
    env.execute()
  }
}

成功读取数据: 

flink消费protobuf flink消费多个source,flink消费protobuf flink消费多个source_big data_06,第6张

 


https://www.xamrdz.com/backend/3sm1931249.html

相关文章: