当前位置: 首页>编程语言>正文

flink里面的state flink-dist

背景

纯小白新手入门flink,由于自身基础差底子薄,启动个源码各种查资料找资源,终于启动好了 值得记录一下,本文源码启动是基于idea+jdk8+maven在windows上启动flink1.16源码。

上车

(1)下载源码

源码地址:https://github.com/apache/flink/tree/release-1.16

(2)项目编译

首先在idea中打开命令终端Terminal,然后进行编译

在flink目录中执行:

mvn clean install -DskipTests -Dmaven.javadoc.skip=false -T 1C

进入flink-dist目录(cd flink-dist)执行:

mvn install -DskipTests

这个过程较慢,需要耐心等一会。。。

如果在此过程中报错:Too many files with unapproved license, 需要加入-Drat.skip=true 跳过许可证发行检查。

检验是否成功:进入flink-dist目录查看target文件下是否有flink-1.16-SNAPSHOT-bin文件夹(启动配置时需要用),如下图:

flink里面的state flink-dist,flink里面的state flink-dist_windows,第1张

 (3) 启动StandaloneSessionClusterEntrypoint

a.基础配置:

在flink-runtime模块下的StandaloneSessionClusterEntrypoint类中,不修改直接启动会报错。

flink里面的state flink-dist,flink里面的state flink-dist_intellij idea_02,第2张

 这是一个help信息,要求我们用-c参数输入一个配置文件所在目录。

这个配置文件目录就是flink编译后的conf目录。即:flink-dist模块下target文件下(上第一步编译后的target文件),所以我们在idea的application执行界面的program arguments中填入编译后的conf目录:

-c E:\myStudy\flink1.16\flink-dist\target\flink-1.16-SNAPSHOT-bin\flink-1.16-SNAPSHOT\conf

#注意:-c 是固定格式,后面的路径是项目本身的target文件下的具体地址,需要根据自己项目动态调整

flink里面的state flink-dist,flink里面的state flink-dist_flink里面的state_03,第3张

b.主类缺失:

这时候再次点击main方法启动时,还是报错:Exception in thread "main" java.lang.NullPointerException,

所以我们需要依赖其他的jar包,即:在IDEA的File -> project structure->Modules中给flink-runtime-添加依赖,依赖jar包来源于flink-dist模块下target文件下(上第一步编译后的target文件)下\flink-1.16-SNAPSHOT-bin\flink-1.16-SNAPSHOT\lib目录下的所有jar包都添加进去。

flink里面的state flink-dist,flink里面的state flink-dist_配置文件_04,第4张

c.启动:

重新启动该类的main方法,就启动起来了,可以打开localhost:8081可以出现flink界面。

flink里面的state flink-dist,flink里面的state flink-dist_windows_05,第5张

d.日志配置:

能够启动项目了但是无法看到日志,需要在jvm启动命令中指定log4j配置文件。

-Dlog4j.configurationFile=file:/E:/myStudy/flink1.16/flink-dist/target/flink-1.16-SNAPSHOT-bin/flink-1.16-SNAPSHOT/conf/log4j-console.properties

#注意:-Dlog4j.configurationFile=file:  是固定格式,后面的路径是项目本身的target文件下的具体地址,需要根据自己项目动态调整

flink里面的state flink-dist,flink里面的state flink-dist_intellij idea_06,第6张

 在log4j-console.properties文件中,日志目录被配置为$sys:log.file$,即从系统环境变量中取log.file,如果没有配置的话仍然会无法显示日志或者报错,所以需要提前配置好。

我这边不打算采用系统环境变量的方式,而是用自定义的环境变量log.file=target/log/flink.log,并把log4j-console.properties中的$sys:log.file$都改成$env:log.file$,这样的好处是可以把JobManager和TaskManager的日志分开。

(4) 启动TaskManagerRunner

同StandaloneSessionClusterEntrypoint类启动大体一致,都需要配置a,b两步,但是由于b步在StandaloneSessionClusterEntrypoint中已经配置过,所以在此不用再次配置,只需要配置步骤a。

a.基础配置:

在idea的application执行界面的program arguments中填入编译后的conf目录,同StandaloneSessionClusterEntrypoint类中配置相同,可直接拷贝过来。

b.创建临时目录:

启动时会创建一个临时目录,这个目录的名字格式是:tm_localhost:63324-413063,其中冒号在windows里是不被允许的,所以需要修改TaskManagerRunner类中getTaskManagerResourceID()方法,把ip和端口的连接符号从冒号改成下划线,如下图:

flink里面的state flink-dist,flink里面的state flink-dist_flink_07,第7张

 c.配置项添加

进入flink-dist模块下找target文件,找到flink-1.16-SNAPSHOT-bin下的conf配置文件夹下的flink-conf.yaml文件(即路径为:flink-dist\target\flink-1.16-SNAPSHOT-bin\flink-1.16-SNAPSHOT\conf\flink-conf.yaml),进行配置文件的修改:

## ================== 单独启动时添加 ===================
taskmanager.cpu.cores: 2

taskmanager.memory.task.heap.size: 750mb
taskmanager.memory.task.off-heap.size: 0mb

taskmanager.memory.managed.size: 2048mb

taskmanager.memory.network.min: 128mb
taskmanager.memory.network.max: 128mb

taskmanager.memory.framework.heap.size: 128mb
taskmanager.memory.framework.off-heap.size: 128mb
taskmanager.log.path: target/log/jobmanager/flink.log

# jvm
taskmanager.memory.jvm-metaspace.size: 256mb
taskmanager.memory.jvm-overhead.max: 256mb
taskmanager.memory.jvm-overhead.min: 256mb
## ==================================================

 其中taskmanager.log.path是task的日志文件,根据自己项目情况动态调整。

d.启动:

刷新localhost:8081界面,Taskmaanger启动成功后会自动注册到JobManager,所以在flink界面能够看到TaskManager的数量从0变成了1。

flink里面的state flink-dist,flink里面的state flink-dist_flink里面的state_08,第8张

e.日志配置

同同StandaloneSessionClusterEntrypoint类的日志配置相同。

结束

至此,flink源码启动完成,如有错误之处,还望各位大佬体谅与指教!


https://www.xamrdz.com/lan/5m71951248.html

相关文章: