当前位置: 首页>后端>正文

Airflow集群搭建

部署节点

cdhnode1:webserver scheduler worker
cdhnode2:worker flower
cdhnode3:worker

访问地址

Airflow:xxx

Flower:xxx

同步物料

1.python3安装包:/opt/Python-3.7.15.tgz

2.pip3安装脚本:/opt/pip3

3.openssl安装包:/opt/openssl-1.0.2r.tar.gz

4.sqlite3安装包:/opt/sqlite-autoconf-3290000.tar.gz

安装步骤

1.基础环境准备

1.1 安装软件包依赖

yum -y install mysql-devel gcc gcc-devel python-devel gcc-c++ cyrus-sasl cyrus-sasl-devel cyrus-sasl-lib libffi-devel sqlite-devel

1.2 openssl安装

tar zxvf openssl-1.0.2r.tar.gz

./config --prefix=/opt/openssl1.0.2r --openssldir=/opt/openssl1.0.2r/openssl no-zlib

make && make install

echo "/opt/openssl1.0.2r/lib" >> /etc/ld.so.conf

1.3 安装python3.7.15环境

tar -zxvf Python-3.7.15.tgz

./configure

vim Modules/Setup #配置python3支持openssl

 *SSL=/opt/openssl1.0.2r*
 *_ssl _ssl.c \*
 *        -DUSE_SSL -I$(SSL)/include -I$(SSL)/include/openssl \*
 *        -L$(SSL)/lib -lssl -lcrypto*

make && make install

1.4 设置环境变量

vim /etc/profile

 export PYTHON_HOME=/opt/Python-3.7.15
 export PATH=$PYTHON_HOME/bin:$PATH

1.5 将python3.7.15设置成默认python版本

mv /usr/bin/python /usr/bin/python.bak
ln -s /usr/local/bin/python3.7 /usr/bin/python

1.6 安装pip3,并设置成默认pip版本

sudo python /opt/pip3/python3.7/get-pip.py

mv /usr/bin/pip /usr/bin/pip.bak
ln -s /usr/local/bin/pip3.7 /usr/bin/pip

1.7 sqlite3安装,替换系统默认低版本

tar -zxvf sqlite-autoconf-3290000.tar.gz

./configure --prefix=/usr/local

make && make install

mv /usr/bin/sqlite3 /usr/bin/sqlite3_old

ln -s /usr/local/bin/sqlite3 /usr/bin/sqlite3

echo "/usr/local/lib" > /etc/ld.so.conf.d/sqlite3.conf

ldconfig

2. pip依赖包安装

2.1 环境变量设置

export SLUGIFY_USES_TEXT_UNIDECODE=yes

vim /etc/profile

export AIRFLOW_HOME=/opt/airflow

2.2 pip安装

(1)airflow基础包

pip install apache-airflow[password]
pip install flask_bcrypt
pip install apache-airflow[mysql]
pip install apache-airflow[celery]
pip install pymysql

(2)安装分布式任务队列

pip install celery

(3)安装 python 的 redis 包,为启动 worker 作准备

pip install redis
pip install flower

(4)安装airflow后,单节点启动,在~/目录下生成airflow配置文件夹,默认启动使用sqlite3作为db启动,后续改成mysql

AIRFLOW_VERSION=2.4.1
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
airflow standalone

airflow db init
airflow standalone

3.修改airflow.cfg配置文件,~/airflow/airflow.cfg

3.1 注释掉默认单线程队列,设置为分布式队列与配置redis

#executor = SequentialExecutor
executor = CeleryExecutor
broker_url = [redis://xxx](redis://xxx)
celery_result_backend = [redis://xxx](redis://xxx)

3.2 注释掉默认数据库配置,设置为Mysql配置

#sql_alchemy_conn = sqlite:////root/airflow/airflow.db
sql_alchemy_conn =mysql://airflow:xxx@xxx:3306/airflow?charset=utf8

3.3 修改[celery]队列配置

broker_url =redis://xxx:6379/x
 
result_backend = db+mysql://airflow:xxx@xxx:3306/airflow

3.4 Mysql创建airflow用户

CREATE DATABASE airflow;
CREATE USER 'airflow'@'%' IDENTIFIED BY 'xxx';
GRANT ALL PRIVILEGES ON airflow.* TO 'airflow'@'%' IDENTIFIED BY 'xxx';
select User, host from mysql.user;
flush privileges;

4.集群部署

将三台集群几点都安装上airflow环境并初始化,将cdhnode1的配置文件分发到各节点。

cdhnode1:

airflow webserver -p 8080 -D

airflow scheduler -D

airflow celery worker -D

cdhnode2:

airflow celery worker -D

cdhnode3:

airflow celery worker -D

5.查看集群各节点状态

ps aux |grep webserver

ps aux |grep scheduler

ps aux|grep "celery worker"

6.airflow命令

6.1 创建用户

airflow users create
--username admin
--firstname admin
--lastname admin
--role Admin
--email xx@xx.com
--password admin

6.2 删除用户

airflow users delete --username admin


https://www.xamrdz.com/backend/32s1941332.html

相关文章: