Flink1.11.2集成CDH6.0.1
- 1. 集群规划
- 2. 版本选择
- 3. Flink集成CDH编译
- 3.1 准备maven环境
- 3.2 编译flink-shaded 版本
- 3.3 编译flink源码
- 3.4 制作parcel安装包
- 3.5 CDH集成Flink
- 3.6 搭建过程中问题汇总
- 3.7 Flink界面验证
- 3.8 运行Flink例子程序
- 3.9 提交Flink任务到yarn参数说明
1. 集群规划
普通分布式集群
一个简单的表格是这么创建的:
节点名称 | master | slave1 | slave2 |
JobManager | √ | ||
TaskManager | √ | √ | √ |
2. 版本选择
- CentOS 7.5
- CDH 6.0.1
- Scala 2.11
- Flink 1.11.2
3. Flink集成CDH编译
3.1 准备maven环境
- 获取maven安装包
wget https://mirror.bit.edu.cn/apache/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz
- 解压文件
tar -zxvf apache-maven-3.6.3-bin.tar.gz
- 配置环境变量 /etc/profile
export MAVEN_HOME=/usr/local/apache-maven-3.6.3
export PATH=$MAVEN_HOME/bin:$PATH
- 使环境变量生效
source /etc/profile
- 修改setting.xml:
vim apache-maven-3.6.3/conf/settings.xml
添加如下配置
<mirrors>
<mirror>
<id>alimaven</id>
<mirrorOf>central</mirrorOf>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/repositories/central/</url>
</mirror>
<mirror>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<mirrorOf>central</mirrorOf>
</mirror>
<mirror>
<id>central</id>
<name>Maven Repository Switchboard</name>
<url>http://repo1.maven.org/maven2/</url>
<mirrorOf>central</mirrorOf>
</mirror>
<mirror>
<id>repo2</id>
<mirrorOf>central</mirrorOf>
<name>Human Readable Name for this Mirror.</name>
<url>http://repo2.maven.org/maven2/</url>
</mirror>
<mirror>
<id>ibiblio</id>
<mirrorOf>central</mirrorOf>
<name>Human Readable Name for this Mirror.</name>
<url>http://mirrors.ibiblio.org/pub/mirrors/maven2/</url>
</mirror>
<mirror>
<id>jboss-public-repository-group</id>
<mirrorOf>central</mirrorOf>
<name>JBoss Public Repository Group</name>
<url>http://repository.jboss.org/nexus/content/groups/public</url>
</mirror>
<mirror>
<id>google-maven-central</id>
<name>Google Maven Central</name>
<url>https://maven-central.storage.googleapis.com
</url>
<mirrorOf>central</mirrorOf>
</mirror>
<!-- 中央仓库在中国的镜像 -->
<mirror>
<id>maven.net.cn</id>
<name>oneof the central mirrors in china</name>
<url>http://maven.net.cn/content/groups/public/</url>
<mirrorOf>central</mirrorOf>
</mirror>
</mirrors>
3.2 编译flink-shaded 版本
- 获取安装包编译flink-shaded 版本
wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-shaded-12.0/flink-shaded-12.0-src.tgz
- 解压jar包:tar -zxvf flink-shaded-12.0-src.tgz
- 修改pom.xml
<profile>
<id>vendor-repos</id>
<activation>
<property>
<name>vendor-repos</name>
</property>
</activation>
<!-- Add vendor maven repositories -->
<repositories>
<!-- Cloudera -->
<repository>
<id>cloudera-releases</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<!-- Hortonworks -->
<repository>
<id>HDPReleases</id>
<name>HDP Releases</name>
<url>https://repo.hortonworks.com/content/repositories/releases/</url>
<snapshots><enabled>false</enabled></snapshots>
<releases><enabled>true</enabled></releases>
</repository>
<repository>
<id>HortonworksJettyHadoop</id>
<name>HDP Jetty</name>
<url>https://repo.hortonworks.com/content/repositories/jetty-hadoop</url>
<snapshots><enabled>false</enabled></snapshots>
<releases><enabled>true</enabled></releases>
</repository>
<!-- MapR -->
<repository>
<id>mapr-releases</id>
<url>https://repository.mapr.com/maven/</url>
<snapshots><enabled>false</enabled></snapshots>
<releases><enabled>true</enabled></releases>
</repository>
</repositories>
</profile>
- 进入flink-shaded-12.0文件夹
mvn clean install -DskipTests -Pvendor-repos -Dhadoop.version=3.0.0-cdh6.0.1 -Dscala-2.11 -Drat.skip=true -T10C
3.3 编译flink源码
- 获取flink源码包
wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.11.2/flink-1.11.2-src.tgz
- 解压jar包:tar -zxvf flink-1.11.2-src.tgz
- 修改pom文件
<profile>
<id>vendor-repos</id>
<activation>
<property>
<name>vendor-repos</name>
</property>
</activation>
<!-- Add vendor maven repositories -->
<repositories>
<!-- Cloudera -->
<repository>
<id>cloudera-releases</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<!-- Hortonworks -->
<repository>
<id>HDPReleases</id>
<name>HDP Releases</name>
<url>https://repo.hortonworks.com/content/repositories/releases/</url>
<snapshots><enabled>false</enabled></snapshots>
<releases><enabled>true</enabled></releases>
</repository>
<repository>
<id>HortonworksJettyHadoop</id>
<name>HDP Jetty</name>
<url>https://repo.hortonworks.com/content/repositories/jetty-hadoop</url>
<snapshots><enabled>false</enabled></snapshots>
<releases><enabled>true</enabled></releases>
</repository>
<!-- MapR -->
<repository>
<id>mapr-releases</id>
<url>https://repository.mapr.com/maven/</url>
<snapshots><enabled>false</enabled></snapshots>
<releases><enabled>true</enabled></releases>
</repository>
</repositories>
</profile>
- 进入flink-1.11.2文件夹
mvn clean install -DskipTests -Pvendor-repos -Dhadoop.version=3.0.0-cdh6.0.1 -Dscala-2.11 -Drat.skip=true -T10C
- 打包编译好的文件
#进入目录:
cd flink-dist/target/flink-1.11.2-bin/
#注意:我们是基于scala2.11编译的,压缩包的名称必须是:flink-1.11.2-bin-scala_2.11.tgz
#打包命令:
tar -zcf flink-1.11.2-bin-scala_2.11.tgz flink-1.11.2/
- cp flink-1.11.2-bin-scala_2.11.tgz /usr/local/flink-parcel/
3.4 制作parcel安装包
- 下载项目
git clone https://github.com/pkeropen/flink-parcel.git
- cd flink-parcel
- 修改配置文件
vim flink-parcel.properties
#FLINK 下载地址
FLINK_URL=https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.11.2/flink-1.11.2-bin-scala_2.11.tgz
#flink版本号
FLINK_VERSION=1.11.2
#扩展版本号
EXTENS_VERSION=BIN-SCALA_2.11
#操作系统版本,以centos为例
OS_VERSION=7
#CDH 小版本
CDH_MIN_FULL=5.16.1
CDH_MAX_FULL=6.3.2
#CDH大版本
CDH_MIN=5
CDH_MAX=6
- 生成parcel文件,build.sh文件不能运行,需要先变更权限
chmod -R 755 build.sh
./build.sh parcel
- 生成的文件都在FLINK-1.10.0-BIN-SCALA_2.11_build目录下
- 生成csd文件,生成文件FLINK_ON_YARN-1.11.2.jar
a) on yarn 版本:
./build.sh csd_on_yarn
b) standalone版本:
./build.sh csd_standalone - 配置flink环境变量
export FLINK_HOME=/usr/local/flink-parcel/FLINK-1.11.2-BIN-SCALA_2.11/lib/flink
export PATH=$PATH:$FLINK_HOME/bin
3.5 CDH集成Flink
- 将生成的csd的jar包文件放入指定目录中
cp FLINK_ON_YARN-1.11.2.jar /opt/cloudera/csd/
chown cloudera-scm:cloudera-scm /opt/cloudera/csd/FLINK_ON_YARN-1.11.2.jar
systemctl restart cloudera-scm-server
- 将生成的parcel三个文件通过httpd服务配置下载路径
mkdir /var/www/html/flink-1.11.2
cp /usr/local/flink-parcel/FLINK-1.11.2-BIN-SCALA_2.11_build/* /var/www/html/flink-1.11.2
- 测试访问
- 登录cdh管理界面,点击集群->Parcel
- 点击配置
- 增加一行url,并填入配置的parcel文件内网下载路径
- 点击检查新parcel后出现flink,点击下载–>分配–>激活
- 添加服务
- 自定义角色分配
- 如果没有使用k8s需要把这个置空
3.6 搭建过程中问题汇总
- 问题一:
- 文件的哈希值不对,如果不一样就sha1sum后的值更新到.sha文件中
sha1sum FLINK-1.11.2-BIN-SCALA_2.11-el7.parcel
cat FLINK-1.11.2-BIN-SCALA_2.11-el7.parcel.sha
2.如果上图hash值是对的依然上面的错
命令: vim /etc/httpd/conf/httpd.conf
注意:修改后一定要重启httpd服务
命令:systemctl restart httpd
- 问题二:
在 Flink-yarn 服务所在的节点添加 flink 用户和角色:
groupadd flink
useradd flink -g flink
问题三:
如果启动时报缺少hadoop的相关jar包,则需要下载:
wegt https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
下载后将包放至/opt/cloudera/parcels/FLINK/lib/flink/lib
并分发到所有flink节点上
环境变量新增行:export HADOOP_CLASSPATH=/opt/cloudera/parcels/FLINK/lib/flink/lib问题四:
如果启动的过程中一直报错,可以查看flink日志,在/var/log/flink目录下
3.7 Flink界面验证
http://IP:8081
3.8 运行Flink例子程序
- 执行命令
$FLINK_HOME/bin/flink run -m 10.0.30.12:8081
$FLINK_HOME/examples/batch/WordCount.jar
–input hdfs://10.0.30.12/syt/input/
–output hdfs://10.0.30.12/syt/flinkoutput - 执行结果
- HDFS输出文件内容:
- Flink界面
3.9 提交Flink任务到yarn参数说明
- 方式一:同时启动Yarn application和Flink task
flink run -m yarn-cluster -ys 8 -ynm myapp -yn 4 -yjm 1024 -ytm 4096 -d -c com.paultech.MyApp ./myapp.jar
参数 | 说明 |
-m | 运行模式,这里使用yarn-cluster,即yarn集群模式。 |
-ys | slot个数。 |
-ynm | Yarn application的名字。 |
-yn | task manager 数量。 |
-yjm | job manager 的堆内存大小。 |
-ytm | task manager 的堆内存大小。 |
-d | detach模式。可以运行任务后无需再控制台保持连接。 |
-c | 指定jar包中class全名。 |
- 方式二:先启动Yarn application,再在指定的application内运行Flink task
启动yarn session的命令如下:
yarn-session.sh -d -n 4 -nm riskmanater -jm 1024 -tm 4096 -t relative/path/to/file
参数 | 说明 |
-d | Detach模式 |
-nm | Application名称 |
-jm | Job Manager 容器的内存 |
-tm | Task Manager 容器的内存 |
-t | 传送文件至集群,使用相对路径。程序中读取文件仍使用相对路径 |
这里的参数和上面flink的类似,只不过少了前缀y
在特定Flink yarn session上提交Flink任务:
flink run -yid application_12345678 --class MainClassFullPath riskmanager.jar
这里使用-yid参数来指定任务运行于那个Flink yarn session之上。