해당 글은 12월 중에 작성 완료 예정입니다.

 


from datetime import timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
dag = DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
    dag=dag,
)
dag.doc_md = __doc__

t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
templated_command = """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
    echo "{{ params.my_param }}"
{% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag,
)

t1 >> [t2, t3]

 

 

'OpenSource > Apache Airflow' 카테고리의 다른 글

Apache Airflow 클러스터를 구성해보자  (0) 2020.12.09
Apache Airflow 란?  (0) 2020.12.09
Posted by CJ.Ree
,

해당 글은 12월 중에 작성 완료 예정입니다.

 

Installation Scripts

sudo yum update –y
sudo yum install gcc python3-devel -y
sudo yum install python3 -y
sudo yum install -y mysql-devel

// Airflow with Extra Packages 설치 
sudo pip3 install apache-airflow[mysql,celery,redis,crypto,aws]==1.10.12
sudo pip3 install apache-airflow-backport-providers-amazon
// Celery 설치 (Celery 서버)
sudo pip3 install celery

// AWS boto3 설치
sudo pip3 install boto3
// Web Authenticate 기능을 위한 flask-bcrypt 설치
sudo pip3 install flask-bcrypt
// Git 설치
sudo yum install git -y
// amazon-efs-utils 설치
sudo yum install -y amazon-efs-utils   

 


Airflow Directory Structure

AIRFLOW_HOME
├ airflow.config
├ dags     // dag_bags
├ plugins  // plugins
└ logs     // logs
  • AIRFLOW_HOME은 Airflow 설정파일 / DAG 정의 파일 / Airflow 플러그인을 저장하는 디렉토리이다.
  • Dag Bags : DAG 파일들이 저장되는 디렉토리 경로이다.
  • Plugins : Opertor, Sensor, Hook 등의 클래스들을 패키지를 의미한다.
  • Logs : Shceduler, DAG, Task 들의 실행 기록이 로그로 기록된다. 

 


Airflow 실행

데몬 실행 스크립트

'OpenSource > Apache Airflow' 카테고리의 다른 글

Apache Airflow 에서 DAG 를 실행시켜 보자  (0) 2020.12.09
Apache Airflow 란?  (0) 2020.12.09
Posted by CJ.Ree
,

Apache Airflow 는 Python programming 기반의 Workflow Management Tool 이다.

  • Airflow는 Python 기반으로 만들어졌기 때문에 데이터 분석가도 쉽게 코드를 작성 가능
  • Web UI 가 기본적으로 제공되어 DAG 및 Task 관리에 용이함
  • Workflow 의 각 작업별 시간이 나오기 때문에 bottleneck을 찾을 때에도 유용함

 


History

  • Airflow는 2014년 10월 Airbnb의 Maxime Beauchemin에 의해 시작
  • 첫 번째 커밋의 오픈 소스였으며 공식적으로 Airbnb GitHub에 포함되어 2015 년 6 월에 발표함
  • Airflow는 2016년 3월 Apache Software Foundation의 인큐베이터 프로그램에 가입
  • Apache 재단은 2019년 1월에 Apache Airflow를 최상위 프로젝트로 발표함

 


사용목적

  • 데이터 엔지니어링에서는 데이터들을 ETL(Extract, Transform, Load) 과정을 통해 데이터를 가공하며 적재함
    (머신러닝 분야에서도 모델 학습용 데이터 전처리, Train, Prediction시 사용 가능)
  • 위와 같은 경우 앞의 작업의 Output이 다음 작업의 input이 되는 등 순서데로 처리되야할 여러개의 작업이 존재함
  • 관리할 작업이 적다면 Crontab 처리 + 서버에 직접 접속해 디버깅 하는 방식으로 사용할 수 있지만, 관리할 작업들이 많아지면 해당 작업들을 관리할 도구가 필요해짐
  • Workflow Management Tool 은 Airflow 외에도 하둡 에코시스템에 우지(Oozie), Luigi 같은 솔루션이 있음

 


Airflow Architecture (기본 구조)

Airflow 는 기본적으로 Webserver, Scheduler, Worker, MetaDB (Database Backend) 으로 구성되며, DAG 로 Workflow 를 구성하여 실행한다.

Airflow Cluster 구조 (Local Executor)

1. Webserver
  - Web UI 제공
  - Workflow의 전반적인 조작 및 기록 관리

2. Scheduler
  - DAG 가 저장된 디렉토리(DAG Bag)를 주기적으로 스캔하여 DAG를 최신화함
  - 스케쥴에 따라서 DAG 에 정의된 Task 들을 순차적으로 실행 
  - Task 를 실행할 Worker를 지정하여 작업을 명령

3. MetaDB
  - DAG 정보, 스케쥴링 정보, 작업 실행 결과 등이 저장
  - SQLite, PostgreSQL, MySQL 등의 DB 를 지원

4. Worker
  - Airflow의 Task 가 실행되는 환경
  - Executor 설정에 의하여 Local 에서 실행되거나, 별도의 노드 또는 컨테이너에서 실행됨

5. DAGs
  - Task 간의 관계와 순서를 정의, Task 는 Operator 로 구현됨

 


Executor

Airflow는 다양한 Executor 를 지원하며, Executor 는 Task Instance 가 실제로 실행되는 메커니즘이다.
대표적인 Executor 는 아래와 같다.

  • Sequential Executor : Worker 가 한번에 하나의 Task만 수행할수 있어 제약적임
  • Local Executor : Worker 가 Scheduler 와 같은 서버에서 실행됨
  • Celery Executor : Worker 노드를 별도 구성, Message Broker 를 통하여 작업을 실행시킨
  • Kubernetes Executor : 컨테이너 환경에서 Worker 가 실행되도록 구성

 


동작 방식

Airflow 는 Scheduler 의 스케쥴링으로 Worker 에서 작업이 실행되며 동작 방식은 아래와 같다. 

  • Airflow 에서 WorkflowDAG 로 만들어지며 DAG Task 들로 구성된다.
  • TaskOperator 클래스를 인스턴스화하여 만든다.
  • 구성한 Operator 인스턴스는 다음과 같이 Task 가 된다. my_task = MyOperator(...)
  • DAG가 실행되면 Airflow 의 Scheduler 는 데이터베이스에 DAG Run 항목을 만든다.
  • DAG Run 에서 Task 를 실행하면 Task Instance 가 만들어져서 Worker 가 작업을 수행한다.

 

Posted by CJ.Ree
,