Kimuksung
Kimuksung 안녕하세요. 분산처리에 관심이 많은 생각하는 주니어 Data Enginner입니다.

Airflow 기본 개념 [task,dag,멱등성,concurrency .. ]

Airflow 기본 개념 [task,dag,멱등성,concurrency .. ]

안녕하세요
오늘은 Airflow를 다루며, 필요?사용하였던 개념들에 대해서 다시 작성해보았습니다.

Airflow는 기본적으로 모든 시간을 UTC로 맞춘다. ( KTC와는 9시간 차이 )


ENV


  • ${AIRFLOW_HOME} = dag, config 등이 구성되어 있는 directory
1
$ cd ${AIRFLOW_HOME}/dags && ls -al
  • Script file을 실행


Task&Dag

Task vs Task Instance

  • task = Dag 작성 시 같이 정의 ( 정의 단계 )
  • Task Instance = Dag run이 되어, task가 instance로 실제 동작하는 단계 ( 실제 동작 단계 )
Dag vs Dag_run

  • Dag = 수행해야하는 Task와 Dependency, 설정 값을 Python 코드로 작성
  • Dag_run = 실제 실행되는 시점, Instance로 실제 동작하는 단계


start_date

  • Dag가 시작되는 시점 - 실행되는 의미가 아니다.
  • start_date 기준 다음 날 부터 Batch가 시작된다.
  • 만약 주 단위로 동작한다면, 시작하고 싶은 전주로 starttime 설정
1
2
3
4
# weekly
# 12/26일 시작
START_TIME = datetime(2022, 12, 19)
SCHEDULER = "30 01 * * 1"
Excution_date

  • 실제 실행 datetime과는 의미가 없다.
  • Dag의 id로, Dag instance를 구분 값
  • 실제 Dag가 실행되는 기준으로 무언가를 작업해야 한다면, ds,ds_nodash template value를 사용하여야 한다,.
Scheduling

  • Crontab으로 설정
  • 분(0~59) / 시간(0~23) / 일(1~12) / 월(1~12) / 요일(1~7)
  • E.g) 매일 9시 30분에 월~금요일만 배치를 수행하고 싶다면 → 30 9 * * 1-5
1
2
3
4
5
6
7
8
9
SCHEDULER = "40 17 * * *"

dag = DAG( 
    DAG_NAME,
    default_args = default_args,
    description=DAG_DESCRIPTION, 
    start_date=START_TIME, 
    schedule_interval = SCHEDULER 
)


provide_context

  • Python Operator에서 사용
  • Dictionary와 같은 값 넘겨준다.
  • **Airflow 2에서는 더 이상 provide_context를 사용하지 않는다.** - 링크
  • 자동으로 적용, 대신 **context 파라미터로 전달해야 한다.
  • dag, op_args, op_kwargs 와 같은 예약어들은 설정 불가
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def myfunc(**kwargs):
    print(f"{kwargs['my_name']}")

# Airflow 1.x
python_operator = PythonOperator(
    task_id='test',
    python_callable=myfunc,
    params = {"my_name":"kimuksung"},
    provide_context=True,
    dag=dag,
)

# Airflow 2.x
def myfunc(execution_date):
    print(execution_date)

python_operator = PythonOperator(task_id="mytask", python_callable=myfunc, dag=dag)

def myfunc(**context):
    print(context)  # all variables will be provided to context

python_operator = PythonOperator(task_id="mytask", python_callable=myfunc)


concurrency ( 참고 )

Dag Task가 작업하는 resource에 limit을 걸어 한번에 많은 task가 외부 resource와 연결되지 않도록 설정

E.g) update BigQuery table ( 최대 2개 cuncurrency만 허용 ) 시 2개까지만 허용해야한다.

concurrency vs parallelism

concurrency is about **dealing with** lots of things at once but parallelism is about **doing** lots of things at once.

  • Environment-level
    • parallelism : Worker에서 동시에 실행 가능한 Task Instance 수 제어
    • max_active_tasks_per_dag : Dag 당 동시에 실행가능한 Task 수
    • max_active_runs_per_dag : Dag 당 → 한 순간에 가능한 Dag run 수
  • Dag-level
    • Concurrency : 모든 DAG → Run → 동시에 실행할 수 있는 최대 task 인스턴스 수 ( default = max_active_tasks_per_dag )
    • max_active_tasks : 하나의 Dag → run → 최대 task instance 수 ( default = max_active_tasks_per_dag )
    • max_active_runs : Dag runs 수 제한 ( default = max_active_runs_per_dag )
1
2
# Allow a maximum of concurrent 10 tasks across a max of 3 active DAG runs
dag = DAG('my_dag_id', concurrency=10,  max_active_runs=3)
  • Task-level
    • pool : System Resource에서 여러 Task Instance가 한꺼번에 요청을 못내지 않도록 임의의 Task를 하나의 Pool로 정의하여 작업되는 양을 제한 하기 위함
    • task_concurrency = max_ative_tis_per_dag

      concurrency.png


멱등성

  • 간단하게 말하면 동일한 태스크를 여러번 실행되어도 결과가 똑같아야 한다.
  • Task는 원자성을 가져 마지막 결과가 성공,실패(rollback) 처리 되어야 한다.
  • 데이터를 항상 덮어써야 한다. **upsert**
  • 일반적으로, Task는 Append, Replace 중 하나를 실시하게 만들어야 한다.
  • Append → 데이터 중복 가능 / Replace → 반복해도 결과가 변하지 않도록
  • Task Parameter를 이용하여 고유의 이름을 생성 → 여러 번 실행되어도 치환이 되어 동일하게
  • Request를 날리고 Query나 어떠한 함수가 처리된 후 결과를 던져주었는데, 요청한 쪽이 받지 못하고 다시 요청한 경우 중복 제거 프로토콜을 사용하여 방지

E.g) DB의 Transaction / 결제 데이터에서 유저가 결제를 요청했을 때, 네트워크 이슈로 1회가 아닌 여러번 요청했을 때 결과는 한번만 결제되어야 한다.


Template reference

  • jinja template
  • Dag 내부에서 사용 할 때는 문자열 내에 사용하면 된다.
  • 내부에서 변수 처럼 사용하기 위해서는 별도로 정의
  • **ds**
    • yyyy-mm-dd
    • **실제 Dag가 작업하는 날짜 기준 전날 시간**이다
  • tomorrow_ds
    • 실제 Dag가 작업하는 날짜 기준
  • ts
    • yyyy-mm-ddThh:mm:ss
    1
    2
    3
    4
    5
    6
    
      # {[ template_variable }}
      "output/public/mongodb/"
        
      # 별도로 정의
      EXEC_DATE = ''
      "output/public/mongodb/"+EXEC_DATE
    


XCOM

  • 참고
  • Task 사이에서 데이터를 전달하기 위해 사용
  • Variable과 마찬가지로 key-value 형태로 전달.
  • 소량의 데이터만 전달하는 것을 권장
  • 함수 파라미터에 **context를 넘겨주어야한다.
  • context[’task_instance’] 혹은 context[’ti’]로 사용하여 전달 받으면 된다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# PythonOperator return
# return 시 자동으로 push
def return_xcom():
	return "return_xcom"

python_operator = PythonOperator(
    task_id = 'return_xcom',
    python_callable = return_xcom,
    dag = dag
)

# Xcom push-pull
# key , task_ids
def xcom_push_test(**context):
    xcom_value = "xcom_push_value"
    context['task_instance'].xcom_push(key='xcom_push_value', value=xcom_value)

    return "xcom_return_value"

def xcom_pull_test(**context):
    xcom_return = context["task_instance"].xcom_pull(task_ids='return_xcom')
    xcom_push_value = context['ti'].xcom_pull(key='xcom_push_value')
    xcom_push_return_value = context['ti'].xcom_pull(task_ids='xcom_push_task')

    print("xcom_return : {}".format(xcom_return))
    print("xcom_push_value : {}".format(xcom_push_value))
    print("xcom_push_return_value : {}".format(xcom_push_return_value))
    
xcom_push_task = PythonOperator(
    task_id = 'xcom_push_task',
    python_callable = xcom_push_test,
    dag = dag
)

xcom_pull_task = PythonOperator(
    task_id = 'xcom_pull_task',
    python_callable = xcom_pull_test,
    dag = dag
)

# jinja templates
bash_xcom_taskids = BashOperator(
    task_id='bash_xcom_taskids',
    bash_command='echo ""',
    dag=dag
)

bash_xcom_key = BashOperator(
    task_id='bash_xcom_key',
    bash_command='echo ""',
    dag=dag
)

bash_xcom_push = BashOperator(
    task_id='bash_xcom_push',
    bash_command='echo ""',
    dag=dag
)

bash_xcom_pull = BashOperator(
    task_id='bash_xcom_pull',
    bash_command='echo ""',
    dag=dag
)


Branch Operator

  • Task를 분기를 태워 동작하도록 만든다.
  • Default = all_success ( 참고 )
    • Upstream이 모두 성공해야 Task가 실행된다.
    • 예시

      1
      2
      3
      4
      5
      6
      7
      
        ALL_SUCCESS = 'all_success'
        ALL_FAILED = 'all_failed'
        ALL_DONE = 'all_done' # 작업 성공 여부에 관계없이 모두 작동한 경우
        ONE_SUCCESS = 'one_success'
        ONE_FAILED = 'one_failed'
        DUMMY = 'dummy'
        NONE_FAILED = 'none_failed'
      
1
2
3
4
5
6
7
8
from airflow.operators.python import BranchPythonOperator

def random_branch():
    from random import randint

    return "option_1" if randint(1, 2) == 1 else "option_2"

t2 = BranchPythonOperator(task_id="t2", python_callable=random_branch)
Slack API

  • SlackAPIPostOperator 는 2.0 버전 이상 부터는 deprecate
  • Airflow version에 따른 의존성 문제가 발생 할 수 있다. → Python slack Sdk로 대체
1
$ pip install slack_sdk
1
2
3
4
5
# slack api
from slack_sdk import WebClient

client = WebClient(token='SLACK_TOKEN')
response = client.chat_postMessage(channel='#random', text="Hello world!")
1
2
3
4
default_args = {
    'owner': '',
    'on_failure_callback': alarm
}

참조