C’est quoi Apache AirFlow ?

Apache AirFlow est un outil d’orchestration et de monitoring de WorkFlows open source développé par Maxime Beauchemin au sein de Airbnb en 2014. En 2016, ce projet rejoint la fondation Apache.

Ses concepts

AirFlow est développé en Python. L’outil propose plusieurs concepts à connaître pour mieux gérer ces WorkFlows et tâches :

a – DAGs & Tasks

AirFlow est basé principalement sur le concept de DAG (Directed Acyclic Graph). DAG est simplement un graph orienté sans retour possible qui présente un des points forts de cet outil face à ses concurrents en lui permettant l’exécution des tâches dans un ordre précis, en parallèle ou à la suite.

Un DAG est constitué d’un ensemble de Tasks liées les unes aux autres, qui présentent logiquement les étapes du WorkFlow souhaité à orchestrer et à monitorer.

Un DAG est lancé de manière régulière ou via un trigger. Un DAG lancé ou en cours d’exécution est appelé une DagInstance, ses tâches étant donc des TaskInstances.

b – Operators

La création d’une tâche passe techniquement selon le concept de AirFlow par un opérateur. A chaque besoin son opérateur :
BashOperator : Permet d’executer une Commande Bash.
PythonOperator : Permet d’éxecuter une fonction Python.
EmailOperator : Permet d’envoyer un E-mail.
HttpOperator : Permet d’effectuer une requête HTTP.
Etc…
Pour l’aspect technique, un opérateur est tout simplement une classe Python héritant de BaseOperator.

c – Executors

Lorsqu’une tâche est appelée, elle va s’exécuter quelque part : ce quelque part est géré par Airflow via les executors.

Installation

Au niveau des distributions CentOS, l’installation d’Apache AirFlow est un peu compliqué et spécifique suite à l’inexistence du gestionnaire du paquet ‘APT’ qui simplifie l’installation des dépendances nécessaires ainsi que la documentation officielle de Airflow adopte uniquement ce dernier(APT-GET sur Ubuntu). Dans notre cas, nous allons passer par le gestionnaire ‘YUM’.

#L’installation des librairies requises
sudo yum groupinstall "Development tools"
sudo yum install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel python-devel wget cyrus-sasl-devel.x86_64

#L’installation de Python 2.7.X (Préférable 2.7.15)
cd /opt
sudo wget --no-check-certificate https://www.python.org/ftp/python/2.7.15/Python-2.7.15.tar.xz
tar xf Python-2.7.15.tar.xz
cd Python-2.7.15
sudo ./configure --prefix=/usr/local
sudo make
sudo make altinstall
ls -ltr /usr/local/bin/python*
vi ~/.bashrc

#Ajouter cette ligne d’alias python='/usr/local/bin/python2.7'
source ~/.bashrc

#L’installation de PIP(Gestionnaire des paquets Python) :
cd /tmp/
sudo curl "https://bootstrap.pypa.io/get-pip.py" -o "get-pip.py"
sudo python get-pip.py

#L’installation d’Apache AirFlow
pip install apache-airflow --ignore-installed
#Vérifier si tout va bien par :
airflow version

Passons à la pratique !

Pour accomplir ce chapitre, je propose de faire un petit exercice qui nous permettra de parcourir l’ensemble des règles métier de Airflow. Ça vous convient ?
L’objectif de l’exercice sera le développement d’un workflow permettant de lancer un Script Bash localement (Machine AirFlow), après sa finalisation et sa réussite une deuxième tâche sera lancée en exécutant un script SH à distance dans une autre machine via un tunnel SSH.
Décortiquons ensemble le code qui va répondre à cette problématique !

Importation des modules d’airflow et librairies Python si nécessaire

“””
Premièrement nous allons importer le Framework airflow via Python.
Depuis les models de airflow nous allons importer DAG.
Depuis les opérateurs de airflow nous allons importer BashOperator et SSHOpertor.
Depuis les connecteurs (hooks) de airflow nous allons importer SSHHook.
“””
import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.contrib.hooks.ssh_hook import SSHHook
from datetime import datetime, timedelta

Instanciation d’un tunnel SSH

“”” 
Instanciation d’un tunnel sshHook qui permet de créer une connection SSH vers la machine "ec2-54-93-68-31.eu-central-1.compute.amazonaws.com"
Cet objet introduit plusieurs paramètres dont les indispensables sont : 
remote_host : Adresse ou nom d’hôte
username : Nom d’utilisateur
password : Mot de passe (devient facultatif si une clé RSA est générée)
key_file : clé RSA pour l'authentification SSH
“””

sshHook = SSHHook(ssh_conn_id=None,remote_host="machine-airflow.eu-central-1.compute.amazonaws.com",username="centos",password=None,key_file="/home/centos/ssh_key.pem",timeout=None,keepalive_interval=None,port=None)

Instanciation d’un objet DAG

“”” 
Arguments par défaut d’un DAG
“””
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 05, 14),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
“””
Instanciation d’un objet dag, qui adopte les paramètres suivants :
dag_id : Id unique d’un DAG
dafault_args : les arguments par défauts
schedule_interval : Fréquence de planification de l'exécution du DAG (Syntaxe CRON préférable)
“””

dag = DAG(dag_id='hello_world', default_args=default_args, schedule_interval="30 13 * * *")

Instanciation d’un objet t1, qui sera l’opérateur de la tâche SSH

“””
Ci-après les paramétres  :
task_id : Id unique d’une task
command: La commande bash ou emplacement du script SH à exécuter
ssh_hook: Le hook SSH, qui est le tunnel créé en haut (ssh_hook)
dag : dag propriétaire de la tâche (exécuteur)
“””

t1 = SSHOperator(
    task_id='task1',
    command="/home/centos/command.sh ",
    ssh_hook=sshHook,
    dag=dag)

Instanciation d’un objet t2, qui sera l’opérateur de la tâche BASH

“””
Ci-après les paramétres  :
task_id : Id unique d’une task
bash_command: La commande bash ou emplacement du script SH à exécuter
ssh_hook: Le hook SSH, qui est le tunnel créé en haut (ssh_hook)
dag : dag propriétaire de la tâche (exécuteur)
“””

t2 = BashOperator(
    task_id='task2',
    dag=dag,
    bash_command="python /home/centos/PythonLines.py",
    owner='airflow',)

Ordonnancement et priorisation de l’exécution des tâches

“””La tâche t2 s’executera en premier, une fois c’est términé le dag lancera la deuxiéme tâche t1“””
t2 >> t1

J'espère que cet exercice était bénéfique pour vous !

Airflow GUI

Airflow dispose d’une interface graphique très pratique et offrant une UX soit disant correcte.
L’application web permet de suivre l’avancement de nos tâches et de les monitorer graphiquement, ainsi que la disposition d’un dashboard visualisant l’état des DAGs et des tâches et l’exécution de l’ensemble des actions Airflow (Exécuter un dag, une tâche…) en un seul clic.
Pour démarrer l’application web, il est nécessaire d’initialiser la base de donnée (Sqlite par défaut) et démarrer le serveur web.
A ne pas oublier de lancer le Scheduler, qui est le garant de l’automatisation et le déclenchements des DAGs et des tâches…
Pour cela, 3 commandes simples sont sollicitées :

# L’initialisation de la base de donnée :
airflow initdb

# Démarrage du serveur web :
airflow webserver (par défaut est lancé dans le port 8080, un argument -p est disponible pour spécifier un port)

# Démarrage du Scheduler :
airflow scheduler

Dashborad & DAGs : Vue d’ensemble de tous les DAGs de votre environnement

Tree View: Représentation arborescente d’un DAG qui s’étend dans le temps

Graph view: Visualisation des dépendances d’un DAG et de leur statut actuel pour une exécution spécifique.

Task duration: Temps total consacré à différentes tâches au fil du temps.

Gantt view: durée et chevauchement d’un DAG

Code view : Moyen rapide pour afficher le code source d’un DAG

J’ai remarqué que l’article devient un peu ennuyant pour vous ! mais il nous reste un chapitre important concernant le parallélisme… Bon d’accord, laissons cette partie à un autre article qui sera considéré comme une continuité.
J’espère que ce premier jet était déjà bénéfique pour vous, mais retenez bien que Airflow devient de plus en plus un outil très demandé et sollicité dans les marché Big Data. à présent il est adopté par des centaines des compagnies mondiales (AirBnb, Paypal, Ubisoft, Groupon, Adobe, Bloomberg, GitLab, HBO, OVH, Reddit, société générale, Tesla, United AirLines et même Tinder pour les intéressés LOL, etc)