Flink具有两个关系API - 表API和SQL - 用于统一流和批处理。Table API是Scala和Java的语言集成查询API,允许以非常直观的方式组合来自关系运算符的查询,例如选择,过滤和连接。无论输入是批输入还是流输入,任一接口中指定的查询都具有相同的语义并产生相同的结果。
flink版本:1.8.0
scala版本:2.11.8
1、使用maven引入相关依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<scala.version>2.11.8</scala.version>
<flink.version>1.8.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Table API所需依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- JAVA -->
<!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>-->
<!-- Scala -->
<!-- 应先导入:flink-streaming-scala_2.11 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
2、表API和SQL程序的结构
批处理和流式传输的所有Table API和SQL程序都遵循相同的模式。以下代码示例显示了Table API和SQL程序的常见结构
//如果是批处理程序,使用ExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment
//创建一个TableEnvironment
val tableEnv = StreamTableEnvironment.create(env)
val table = tableEnv.fromDataStream(env.readTextFile("/data.txt"))
//注册Table
tableEnv.registerTable("table1",...)// or
tableEnv.registerTableSource("table2", ...) // or
tableEnv.registerExternalCatalog("extCat", ...)
// register an output Table
tableEnv.registerTableSink("outputTable", ...);
// create a Table from a Table API query
val tapiResult = tableEnv.scan("table1").select(...)
// Create a Table from a SQL query
val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ...")
// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.insertInto("outputTable")
// execute
env.execute()
具体代码示例
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment, _}
import org.apache.flink.table.catalog.InMemoryExternalCatalog
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.table.sources.CsvTableSource
object TableRegist_1 {
val csvPath = "E:/workspace/flink-practice/src/main/resources/"
def main(args: Array[String]): Unit = {
//流式数据
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val stableEnv = StreamTableEnvironment.create(senv)
val dataStream = senv.fromElements(
("张三", 20), ("李四", 22),
("王五", 24), ("赵六", 18)
)
//注册表的方式:
//1、通过DataStream
stableEnv.registerTable("s_people", stableEnv.fromDataStream(dataStream,"name","age"))
//2、通过查询结果
val resTable = stableEnv.scan("s_people").select("_1,_2")
stableEnv.registerTable("resTable", resTable)
//注册tableSource
//TableSource的已有实现类:CsvTableSource、Kafka09TableSource
stableEnv.registerTableSource("s_source",
CsvTableSource.builder()
.path(csvPath + "tablesource.csv")
.field("name", Types.STRING)
.field("age", Types.INT)
.build())
//注册TableSink
//TableSink的已有实现类:CsvTableSink、Kafka09TableSink
stableEnv.registerTableSink("s_sink",
new CsvTableSink(csvPath + "tablesink.csv", ",", 2, WriteMode.OVERWRITE)
.configure(Array("name", "age")
, Array(Types.STRING, Types.INT)))
//注册外部目录
//外部目录可以提供有关外部数据库和表的信息,例如其名称,架构,统计信息以及有关如何访问存储在外部数据库,表或文件中的数据的信息
//已有实现类为:InMemoryExternalCatalog
val catalogCatalog = new InMemoryExternalCatalog("catalog")
stableEnv.registerExternalCatalog("s_catalog", catalogCatalog)
/** *********************************************************************************/
//离线批数据类似流式数据
val env = ExecutionEnvironment.getExecutionEnvironment
val btableEnv = BatchTableEnvironment.create(env)
val dataSet = env.fromElements(
("张三", 20), ("李四", 22),
("王五", 24), ("赵六", 18)
)
btableEnv.registerTable("b_people", btableEnv.fromDataSet(dataSet))
}
}
2.1、将DataStream或DataSet与表之间的注册与转换
- 将DataStream或DataSet注册为表
// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = StreamTableEnvironment.create(env)
val stream: DataStream[(Long, String)] = ...
// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream)
// register the DataStream as table "myTable2" with fields "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)
- 将DataStream或DataSet转换为表
val env = StreamExecutionEnvironment.getExecutionEnvironment
// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = StreamTableEnvironment.create(env)
val stream: DataStream[Long] = env.fromCollection(Seq(1L))
// convert the DataStream into a Table with default fields '_1, '_2
val table1: Table = tableEnv.fromDataStream(stream)
- 将表转换为DataStream或DataSet
// get TableEnvironment.
// registration of a DataSet is equivalent
val tableEnv = StreamTableEnvironment.create(env)
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)
// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] dsTuple =
tableEnv.toAppendStream[(String, Int)](table)
// convert the Table into a retract DataStream of Row.
// A retract stream of type X is a DataStream[(Boolean, X)].
// The boolean field indicates the type of the change.
// True is INSERT, false is DELETE.
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)
2.2 创建一个TableEnvironment
TableEnvironment是Table API和SQL集成的核心概念。它负责:
1. Table在内部目录中注册表
2. 注册外部目录
3. 执行SQL查询
4. 注册用户定义的(标量,表或聚合)函数
5. 转换a DataStream或DataSet转换为aTable
6. 持有对ExecutionEnvironment或的引用StreamExecutionEnvironment
如果是批处理,调用BatchTableEnvironment.create()方法创建TableEnvironment,如果是流处理,怎调用StreamTableEnvironment.create()方法。
// ***************
// 流式数据查询
// ***************
import org.apache.flink.table.api.scala.StreamTableEnvironment
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
val sTableEnv = StreamTableEnvironment.create(sEnv)
// ***********
// 批数据处理查询
// ***********
import org.apache.flink.table.api.scala.BatchTableEnvironment
val bEnv = ExecutionEnvironment.getExecutionEnvironment
val bTableEnv = BatchTableEnvironment.create(bEnv)
2.3 在目录中注册表
TableEnvironment维护按名称注册的表的目录。有两种类型的表,输入表和输出表。输入表可以在表API和SQL查询中引用,并提供输入数据。输出表可用于将Table API或SQL查询的结果发送到外部系统。
注册输入表的方法:
1. 现有Table对象,通常是Table API或SQL查询的结果。
2. TableSource,访问外部数据,例如文件,数据库或消息传递系统。
3. DataStream或DataSet来自DataStream或DataSet程序
2.4 注册表
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Table is the result of a simple projection query
Table projTable = tableEnv.scan("X").select(...);
// register the Table projTable as table "projectedX"
tableEnv.registerTable("projectedTable", projTable);
2.5 注册TableSource
TableSource提供对外部数据的访问,存储在存储系统中
// get a TableEnvironment
val tableEnv = StreamTableEnvironment.create(env)
// create a TableSource
val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)
// register the TableSource as table "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource)
2.6 注册TableSink
已注册TableSink可用于将表API或SQL查询的结果发送到外部存储系统,例如数据库,键值存储,消息队列或文件系统
// get a TableEnvironment
val tableEnv = StreamTableEnvironment.create(env)
// create a TableSink
val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)
// define the field names and types
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)
// register the TableSink as table "CsvSinkTable"
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
2.7 注册外部目录
外部目录可以提供有关外部数据库和表的信息,例如其名称,架构,统计信息以及有关如何访问存储在外部数据库,表或文件中的数据的信息。
// get a TableEnvironment
val tableEnv = StreamTableEnvironment.create(env)
// create an external catalog
val catalog: ExternalCatalog = new InMemoryExternalCatalog
// register the ExternalCatalog catalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog)