目录
序言
一. kafka的数据源
1.1 json
1.1.1 flink sql ddl
1.1.2 数据准备
1.1.3 开启sql-client
1.1.4 代码
1.1.6 配置
1.2 debeizum-json
1.2.1 flink sql
1.2.2 准备数据
1.2.3 sql-client
1.2.4 代码
1.2.5 采坑
(3)关于debeizum 的 metadata如何获取?
1.2.6 用debezium-json的 关键 配置
1.3 kafka-more-json
1.3.2 准备数据
1.3.3 sql-client
1.3.5 采坑
序言
近期主要是我花时间一步步采坑实践出来的各种细节,发现官网很多文字和配置都误差. 所以本人本着真实可靠的实践操作来给予大家的文案.希望可以帮到你 .
做实践之前,必须准备
flink 环境 略
java 环境 略
sql-client 开启 略
docker 环境. 以备各个组件的快速运行.
一. kafka的数据源
读取kafka的数据包含三种类型, json ,csv,debezium-json,canel-json 点击
1.1 json
1.1.1 flink sql ddl
CREATE TABLE user_test (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3) ) WITH ( 'connector' = 'kafka',
'topic' = 'user_test',
'properties.group.id' = 'local-sql-test',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'nlocalhost:9200',
'format' = 'json' ,
'json.timestamp-format.standard' = 'SQL',
'scan.topic-partition-discovery.interval'='10000',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
1.1.2 数据准备
(1)create topic
./kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 --topic user_test(2)producer topic data
./kafka-console-producer.sh \
--broker-list localhost:9200 \
--topic user_test
(3) data
(json 可以调整,key可以缺少,也可以是int类型)这取决于你后面的datatypes的类型
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26 01:00:00.123"}
1.1.3 开启sql-client
./bin/sql-client.sh embedded
(1) 准备sql-client
mkdir -p sqlclient && cd sqlclient
flink-json-1.12.0.jar
flink-sql-connector-kafka_2.11-1.12.0.jar
参考:
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.11.1/flink-sql-connector-kafka_2.11-1.12.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-json/1.12.0/flink-json-1.12.0.jar
启动可能需要的环境变量参数
create YAML file (sql-env.yaml)
configuration:
execution.target: yarn-session
bin/sql-client.sh embedded -e sql-env.yaml
或者用其他方式启动
bin/sql-client.sh embedded -s yarn-session
1.1.4 代码
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.12.1</version>
</dependency>
/**
* @Auther: laraj
* @Date: 2021/1/28
* @Description: https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html
*/
public class FlinkKafkaSource implements Serializable {
/**
*
./kafka-console-producer.sh \
--broker-list localhost:9200 \
--topic user_test
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26 01:00:00"}
*/
public static final String KAFKA_TABLE_SOURCE_DDL = "" +
"CREATE TABLE user_test (\n" +
" user_id BIGINT,\n" +
" item_id BIGINT,\n" +
" category_id BIGINT,\n" +
" behavior STRING,\n" +
" `record_time` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp',\n" +
" ts TIMESTAMP(3) " +
" \n" +
") WITH (\n" +
" 'connector' = 'kafka', " +
" 'topic' = 'user_test', " +
" 'properties.group.id' = 'local-test', " +
" 'json.fail-on-missing-field' = 'false', " +
" 'json.ignore-parse-errors' = 'true', " +
" 'json.timestamp-format.standard'='SQL', " +
" 'scan.topic-partition-discovery.interval'='10000', " +
" 'scan.startup.mode' = 'latest-offset', " +
" 'properties.bootstrap.servers' = 'localhost:9200', " +
" 'format' = 'json' " +
") ";
public static void main(String[] args) {
//构建StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3,
Time.of(5, TimeUnit.MINUTES),
Time.of(10, TimeUnit.SECONDS)
));
//构建EnvironmentSettings 并指定Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//构建StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
tEnv.executeSql(KAFKA_TABLE_SOURCE_DDL);
//执行查询
Table table = tEnv.sqlQuery("select * from user_test ");
//转回DataStream并输出
DataStream<Row> rowDataStream = tEnv.toAppendStream(table, Row.class);
rowDataStream.map(new MapFunction<Row, Row>() {
@Override
public Row map(Row row) throws Exception {
System.out.println(row);
return row;
}
}).print("FlinkKafkaSource==").setParallelism(1);
//任务启动,这行必不可少!
try {
env.execute("FlinkKafkaSource");
} catch (Exception e) {
e.printStackTrace();
}
}
}
1.1.5 采坑强调
(1)
Could not find any factory for identifier 'json' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath.
--> pom中少引用了flink-json(2)关于数据中想要用timestamp字段类型
flink.shaded.jackson2.com.fasterxml.jackson.databind.node.MissingNode cannot be cast to org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
-->sql定义中出现了不能序列化的datatypes.比如timetamps 在mysql中有,但在flinksql中可能序列化不完整. 如果需要的话,必须要 'json.timestamp-format.standard' = 'SQL'的配置.(3) 关于json的规范问题
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
目的是为了json缺失或者数据为空的时候,不会抛异常(4) 最终执行 sql在代码层的时候,还得去掉';'
这个点我也没有明白,反正去掉分好就是对的
1.1.6 配置
注意,这和官网的相违背,所以我推断写官网文档的一定不是写源码的commiter
这里做一个简单的重点解释 解释.
key.fields
key.fields-prefix
key.format
properties.bootstrap.servers
properties.group.id
property-version
scan.startup.mode --'earliest-offset' 'latest-offset' 'group-offsets' 'timestamp' and 'specific-offsets'.
scan.startup.specific-offsets --定向分区开始消费
scan.startup.timestamp-millis ---指定开始消费的时间戳
scan.topic-partition-discovery.interval --发现动态Kafka新加入的partition并设置时间间隔。
sink.parallelism --指定分区sink
sink.partitioner ---指定sink 到kafka的
sink.semantic -- 'at-least-once' / 'exactly-once'
topic
topic-pattern --对于 topic 和topic-partition 只能任选一个
value.fields-include --EXCEPT_KEY / ALL 默認ALL ,如果選了 EXCEPT_KEY,可以简单理解成自定义了一下key. 那么还需要添加以下作为补充. 'key.fields-prefix'/ 'key.fields'
1.2 debeizum-json
1.2.1 flink sql
CREATE TABLE triggers (
`trigger_id` BIGINT,
`trigger_source` STRING,
`modify_time` BIGINT ,
`enc_type` TINYINT ,
PRIMARY KEY (`trigger_id`) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'azkaban.DEV.azkaban.triggers_test',
'properties.bootstrap.servers' = 'localhost:9200',
'scan.startup.mode' = 'latest-offset',
'value.format' = 'debezium-json',
'scan.topic-partition-discovery.interval'='10000',
'value.debezium-json.ignore-parse-errors'='true',
'value.debezium-json.schema-include'='true'
);
1.2.2 准备数据
(1) 需要安装debezium /mysql /kafka connect
详细的介绍了 构建docker-compose 安装着一系列的操作. 细节私我.
https://github.com/leonardBang/flink-sql-etl/tree/master/flink-demo
(2) 最后呈现kafka的数据如图所示就算准备好了,这个一个topic name 对于mysql的一张表.
1.2.3 sql-client
1.2.4 代码
/**
* @Auther: laraj
* @Date: 2021/1/28
* @Description: https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/formats/debezium.html#%E5%A6%82%E4%BD%95%E4%BD%BF%E7%94%A8-debezium-format
*/
public class FlinkKafkaDebeziumSource {
public static void main(String[] args) {
String debeziumKafkaJson = "CREATE TABLE triggers ( \n" +
" `trigger_id` BIGINT,\n" +
" `trigger_source` STRING,\n" +
" `modify_time` BIGINT ,\n" +
" `enc_type` TINYINT ,\n" +
" PRIMARY KEY (`trigger_id`) NOT ENFORCED\n" +
" ) WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'azkaban.DEV.azkaban.triggers_test',\n" +
" 'properties.bootstrap.servers' = 'localhost:9200',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'value.format' = 'debezium-json',\n" +
" 'scan.topic-partition-discovery.interval'='10000',\n" +
" 'value.debezium-json.ignore-parse-errors'='true',\n" +
" 'value.debezium-json.timestamp-format.standard'='SQL' ," +
" 'value.debezium-json.schema-include'='true' " +
" ) ";
String debeziumJsonMetadata = " CREATE TABLE triggers_data ( \n" +
" `schema` STRING NULL METADATA FROM 'value.schema' VIRTUAL,\n" +
" `ingestion_time` TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,\n" +
" `source_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,\n" +
" `source_database` STRING NULL METADATA FROM 'value.source.database' VIRTUAL,\n" +
" `scource_schema` STRING NULL METADATA FROM 'value.source.schema' VIRTUAL,\n" +
" `souce_table` STRING NULL METADATA FROM 'value.source.table' VIRTUAL,\n" +
" `souce_properties` MAP<STRING, STRING> NULL METADATA FROM 'value.source.properties' VIRTUAL,\n" +
" `trigger_id` BIGINT,\n" +
" `trigger_source` STRING,\n" +
" `modify_time` BIGINT ,\n" +
" `enc_type` TINYINT ,\n" +
" PRIMARY KEY (`trigger_id`) NOT ENFORCED\n" +
" ) WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'azkaban.DEV.azkaban.triggers_test',\n" +
" 'properties.bootstrap.servers' = 'localhost:9200',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'value.format' = 'debezium-json',\n" +
" 'scan.topic-partition-discovery.interval'='10000',\n" +
" 'value.debezium-json.ignore-parse-errors'='true',\n" +
" 'value.debezium-json.timestamp-format.standard'='SQL',\n" +
" 'value.debezium-json.schema-include'='true' " +
" ) ";
//构建StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//构建EnvironmentSettings 并指定Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//构建StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
tEnv.executeSql(debeziumKafkaJson);
TableResult tableResult = tEnv.executeSql(debeziumJsonMetadata);
// tableResult.print("--");
//执行查询 原mysql表的字段.
Table table = tEnv.sqlQuery("select * from triggers ");
//转回DataStream并输出
DataStream<Tuple2<Boolean, Row>> dataStream = tEnv.toRetractStream(table, Row.class);
dataStream.map(new MapFunction<Tuple2<Boolean, Row>, Row>() {
@Override
public Row map(Tuple2<Boolean, Row> booleanRowTuple2) throws Exception {
// System.out.println(booleanRowTuple2.f0 + "--" + booleanRowTuple2.f1);
if (booleanRowTuple2.f0) {
return booleanRowTuple2.f1;
} else {
}
return null;
}
}).filter(new FilterFunction<Row>() {
@Override
public boolean filter(Row row) throws Exception {
return row != null;
}
}).print("triggers ").setParallelism(1);
Table table_metadata = tEnv.sqlQuery("select schema,souce_properties from triggers_data ");
//转回DataStream并输出
DataStream<Tuple2<Boolean, Row>> dataStream_metadata = tEnv.toRetractStream(table_metadata, Row.class);
dataStream_metadata.map(new MapFunction<Tuple2<Boolean, Row>, Row>() {
@Override
public Row map(Tuple2<Boolean, Row> booleanRowTuple2) throws Exception {
//System.out.println(booleanRowTuple2.f0 + "--" + booleanRowTuple2.f1);
if (booleanRowTuple2.f0) {
Row value = booleanRowTuple2.f1;
return booleanRowTuple2.f1;
} else {
}
return null;
}
}).filter(new FilterFunction<Row>() {
@Override
public boolean filter(Row row) throws Exception {
return row != null;
}
}).setParallelism(1);
//任务启动,这行必不可少!
try {
env.execute("FlinkKafkaDebeziumSource");
} catch (Exception e) {
e.printStackTrace();
}
}
}
1.2.5 采坑
(1)如果保证一致性 配置:
默认情况下: 保证 exactly-once 如果想要做到消息不重复 at-least-once
操作:
a. 将全局的作业config设置成:table.exec.source.cdc-events-duplicate 设置成 true.
Configuration configuration = tEnv.getConfig().getConfiguration();
configuration.setString("table.exec.source.cdc-events-duplicate", "true");
b. 并在该 source 上定义 PRIMARY KEY.
eg: PRIMARY KEY (`trigger_id`) NOT ENFORCED --不检查主键的唯一性
(2)强调 'value.debezium-json.schema-include'
(flink1.12版本) 和之前不太一样包括官网描述也有误差,如果想要准确读取debezium-json 数据,
之前是 debezium-json.schema-include 如今是
'value.debezium-json.schema-include' = 'true' 此选项表明 Debezium JSON 消息是否包含 schema和payload
(3)关于debeizum 的 metadata如何获取?
CREATE TABLE triggers_data (
`schema` STRING NULL METADATA FROM 'value.schema' VIRTUAL,
`ingestion_time` TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
`source_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
`source_database` STRING NULL METADATA FROM 'value.source.database' VIRTUAL,
`scourc_schema` STRING NULL METADATA FROM 'value.source.schema' VIRTUAL,
`souce_table` STRING NULL METADATA FROM 'value.source.table' VIRTUAL,
`souce_properties` MAP<STRING, STRING> NULL METADATA FROM 'value.source.properties' VIRTUAL,
`trigger_id` BIGINT,
`trigger_source` STRING,
`modify_time` BIGINT ,
`enc_type` TINYINT ,
PRIMARY KEY (`trigger_id`) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'azkaban.DEV.azkaban.triggers_test',
'properties.bootstrap.servers' = 'localhost:9200',
'scan.startup.mode' = 'latest-offset',
'value.format' = 'debezium-json',
'scan.topic-partition-discovery.interval'='10000',
'value.debezium-json.ignore-parse-errors'='true',
'value.debezium-json.timestamp-format.standard'='SQL',
'value.debezium-json.schema-include'='true' )
1.2.6 用debezium-json的 关键 配置
简单抓重点描述 :
- format
- value.format
- value.debezium-json.ignore-parse-errors 忽略解析错误.
- value.debezium-json.map-null-key.literal -- 'debezium-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。
- value.debezium-json.map-null-key.mode --如果出现map数据,key为空的时候,'FAIL': 抛出异常 'DROP':丢弃 'LITERAL':用场量替换
- value.debezium-json.schema-include --默认false,数据中包含schema 和payload等全量,这里需要设置成true
- value.debezium-json.timestamp-format.standard --这个参数感觉还不太好用. SQL '2020-12-30 12:13:14.123'/ 'ISO-8601' '2020-12-30T12:13:14.123'
1.3 kafka-more-json
1.3.1 flink sql
CREATE TABLE more_json (
`afterColumns` ROW(`created` STRING,
`extra` ROW(canGiving BOOLEAN),
`parameter` ARRAY <INT>),
`beforeColumns` STRING ,
`tableVersion` ROW(`binlogFile` STRING,
`binlogPosition` INT ,
`version` INT) ,
`map` MAP<STRING,BIGINT>,
`mapinmap` MAP<STRING,MAP<STRING,INT>>,
`touchTime` BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'more_json',
'properties.group.id' = 'local-sql-test',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'localhost:9200',
'format' = 'json' ,
'json.timestamp-format.standard' = 'SQL',
'scan.topic-partition-discovery.interval' = '10000',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
) ;
1.3.2 准备数据
(1)create topic
./kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 --topic more_json
(2)producer data
./kafka-console-producer.sh \
--broker-list localhost:9200 \
--topic more_json
(3) data
json 可以调整,key可以缺少,也可以是int类型)这取决于你后面的datatypes的类型
{
"afterColumns":{
"created":"1589186680",
"extra":{
"canGiving":false
},
"parameter":[
1,
2,
3,
4
]
},
"map":{
"flink":123
},
"mapinmap":{
"inner_map":{
"key":234,
"key2":"12345"
}
},
"beforeColumns":null,
"tableVersion":{
"binlogFile":null,
"binlogPosition":0,
"version":0
},
"touchTime":1589186680591
}
1.3.3 sql-client
此处无需添加新的配置和jar
1.3.4 代码
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.12.0</version>
</dependency>
/**
* @Auther: laraj
* @Date: 2021/1/28
* @Description: https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html
*/
public class FlinkKafkaSourceObjectJson implements Serializable {
public static void main(String[] args) {
/**
./kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 --topic more_json
{"afterColumns":{"created":"1589186680","extra":{"canGiving":false},"parameter":[1,2,3,4]},"map":{"flink":123},"mapinmap":{"inner_map":{"key":234,"key2":"12345"}},"beforeColumns":null,"tableVersion":{"binlogFile":null,"binlogPosition":0,"version":0},"touchTime":1589186680591}
*/
String moreJsonSql=" CREATE TABLE more_json ( \n" +
" `afterColumns` ROW(`created` STRING,\n" +
" `extra` ROW(canGiving BOOLEAN),\n" +
" `parameter` ARRAY <INT>),\n" +
" `beforeColumns` STRING ,\n" +
" `tableVersion` ROW(`binlogFile` STRING,\n" +
" `binlogPosition` INT ,\n" +
" `version` INT) ,\n" +
" `map` MAP<STRING,BIGINT>, " +
" `mapinmap` MAP<STRING,MAP<STRING,INT>>, " +
" `touchTime` BIGINT\n" +
" ) WITH ( \n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'more_json',\n" +
" 'properties.group.id' = 'local-sql-test',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'properties.bootstrap.servers' = 'localhost:9200',\n" +
" 'format' = 'json' ,\n" +
" 'json.timestamp-format.standard' = 'SQL',\n" +
" 'scan.topic-partition-discovery.interval' = '10000',\n" +
" 'json.fail-on-missing-field' = 'false',\n" +
" 'json.ignore-parse-errors' = 'true' \n" +
" ) ";
//构建StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//构建EnvironmentSettings 并指定Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//构建StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
tEnv.executeSql(moreJsonSql);
//执行查询
// Table table = tEnv.sqlQuery("select * from more_json ");
Table table = tEnv.sqlQuery(" select afterColumns.`parameter`[1] as parameter1, afterColumns.extra['canGiving'] as canGiving , `map`['flink'] as flinkv,`mapinmap`['inner_map']['key2'] as mkey2,tableVersion.binlogFile from more_json ");
//转回DataStream并输出
DataStream<Row> rowDataStream = tEnv.toAppendStream(table, Row.class);
rowDataStream.map(new MapFunction<Row, Row>() {
@Override
public Row map(Row row) throws Exception {
System.out.println(row.toString());
return row;
}
});
//.print("sql==").setParallelism(1);
//任务启动,这行必不可少!
try {
env.execute("test");
} catch (Exception e) {
e.printStackTrace();
}
}
1.3.5 采坑
(1) 查询sql的时候,记住flink sql中的数组是从1开始的
eg:查询1 .
select afterColumns.`parameter`[1] as parameter1, afterColumns.extra['canGiving'] as canGiving , `map`['flink'] as flinkv,`mapinmap`['inner_map']['key2'] as mkey2,tableVersion.binlogFile from
1.3.6 配置
(1) data types
(2) with 配置