Apache Airflow 는 Python programming 기반의 Workflow Management Tool 이다.

  • Airflow는 Python 기반으로 만들어졌기 때문에 데이터 분석가도 쉽게 코드를 작성 가능
  • Web UI 가 기본적으로 제공되어 DAG 및 Task 관리에 용이함
  • Workflow 의 각 작업별 시간이 나오기 때문에 bottleneck을 찾을 때에도 유용함

 


History

  • Airflow는 2014년 10월 Airbnb의 Maxime Beauchemin에 의해 시작
  • 첫 번째 커밋의 오픈 소스였으며 공식적으로 Airbnb GitHub에 포함되어 2015 년 6 월에 발표함
  • Airflow는 2016년 3월 Apache Software Foundation의 인큐베이터 프로그램에 가입
  • Apache 재단은 2019년 1월에 Apache Airflow를 최상위 프로젝트로 발표함

 


사용목적

  • 데이터 엔지니어링에서는 데이터들을 ETL(Extract, Transform, Load) 과정을 통해 데이터를 가공하며 적재함
    (머신러닝 분야에서도 모델 학습용 데이터 전처리, Train, Prediction시 사용 가능)
  • 위와 같은 경우 앞의 작업의 Output이 다음 작업의 input이 되는 등 순서데로 처리되야할 여러개의 작업이 존재함
  • 관리할 작업이 적다면 Crontab 처리 + 서버에 직접 접속해 디버깅 하는 방식으로 사용할 수 있지만, 관리할 작업들이 많아지면 해당 작업들을 관리할 도구가 필요해짐
  • Workflow Management Tool 은 Airflow 외에도 하둡 에코시스템에 우지(Oozie), Luigi 같은 솔루션이 있음

 


Airflow Architecture (기본 구조)

Airflow 는 기본적으로 Webserver, Scheduler, Worker, MetaDB (Database Backend) 으로 구성되며, DAG 로 Workflow 를 구성하여 실행한다.

Airflow Cluster 구조 (Local Executor)

1. Webserver
  - Web UI 제공
  - Workflow의 전반적인 조작 및 기록 관리

2. Scheduler
  - DAG 가 저장된 디렉토리(DAG Bag)를 주기적으로 스캔하여 DAG를 최신화함
  - 스케쥴에 따라서 DAG 에 정의된 Task 들을 순차적으로 실행 
  - Task 를 실행할 Worker를 지정하여 작업을 명령

3. MetaDB
  - DAG 정보, 스케쥴링 정보, 작업 실행 결과 등이 저장
  - SQLite, PostgreSQL, MySQL 등의 DB 를 지원

4. Worker
  - Airflow의 Task 가 실행되는 환경
  - Executor 설정에 의하여 Local 에서 실행되거나, 별도의 노드 또는 컨테이너에서 실행됨

5. DAGs
  - Task 간의 관계와 순서를 정의, Task 는 Operator 로 구현됨

 


Executor

Airflow는 다양한 Executor 를 지원하며, Executor 는 Task Instance 가 실제로 실행되는 메커니즘이다.
대표적인 Executor 는 아래와 같다.

  • Sequential Executor : Worker 가 한번에 하나의 Task만 수행할수 있어 제약적임
  • Local Executor : Worker 가 Scheduler 와 같은 서버에서 실행됨
  • Celery Executor : Worker 노드를 별도 구성, Message Broker 를 통하여 작업을 실행시킨
  • Kubernetes Executor : 컨테이너 환경에서 Worker 가 실행되도록 구성

 


동작 방식

Airflow 는 Scheduler 의 스케쥴링으로 Worker 에서 작업이 실행되며 동작 방식은 아래와 같다. 

  • Airflow 에서 WorkflowDAG 로 만들어지며 DAG Task 들로 구성된다.
  • TaskOperator 클래스를 인스턴스화하여 만든다.
  • 구성한 Operator 인스턴스는 다음과 같이 Task 가 된다. my_task = MyOperator(...)
  • DAG가 실행되면 Airflow 의 Scheduler 는 데이터베이스에 DAG Run 항목을 만든다.
  • DAG Run 에서 Task 를 실행하면 Task Instance 가 만들어져서 Worker 가 작업을 수행한다.

 

Posted by CJ.Ree
,