Airflow 기본 개념 [task,dag,멱등성,concurrency .. ]
안녕하세요
오늘은 Airflow를 다루며, 필요?사용하였던 개념들에 대해서 다시 작성해보았습니다.
Airflow는 기본적으로 모든 시간을 UTC로 맞춘다. ( KTC와는 9시간 차이 )
ENV
${AIRFLOW_HOME}
= dag, config 등이 구성되어 있는 directory
1
$ cd ${AIRFLOW_HOME}/dags && ls -al
- Script file을 실행
Task&Dag
Task vs Task Instance
- task = Dag 작성 시 같이 정의 ( 정의 단계 )
- Task Instance = Dag run이 되어, task가 instance로 실제 동작하는 단계 ( 실제 동작 단계 )
Dag vs Dag_run
- Dag = 수행해야하는 Task와 Dependency, 설정 값을 Python 코드로 작성
- Dag_run = 실제 실행되는 시점, Instance로 실제 동작하는 단계
start_date
- Dag가 시작되는 시점 - 실행되는 의미가 아니다.
- start_date 기준 다음 날 부터 Batch가 시작된다.
- 만약 주 단위로 동작한다면, 시작하고 싶은 전주로 starttime 설정
1
2
3
4
# weekly
# 12/26일 시작
START_TIME = datetime(2022, 12, 19)
SCHEDULER = "30 01 * * 1"
Excution_date
- 실제 실행 datetime과는 의미가 없다.
- Dag의 id로, Dag instance를 구분 값
- 실제 Dag가 실행되는 기준으로 무언가를 작업해야 한다면, ds,ds_nodash template value를 사용하여야 한다,.
Scheduling
- Crontab으로 설정
- 분(0~59) / 시간(0~23) / 일(1~12) / 월(1~12) / 요일(1~7)
- E.g) 매일 9시 30분에 월~금요일만 배치를 수행하고 싶다면 →
30 9 * * 1-5
1
2
3
4
5
6
7
8
9
SCHEDULER = "40 17 * * *"
dag = DAG(
DAG_NAME,
default_args = default_args,
description=DAG_DESCRIPTION,
start_date=START_TIME,
schedule_interval = SCHEDULER
)
provide_context
- Python Operator에서 사용
- Dictionary와 같은 값 넘겨준다.
**Airflow 2에서는 더 이상 provide_context를 사용하지 않는다.**
- 링크- 자동으로 적용, 대신 **context 파라미터로 전달해야 한다.
dag
,op_args
,op_kwargs
와 같은 예약어들은 설정 불가
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def myfunc(**kwargs):
print(f"{kwargs['my_name']}")
# Airflow 1.x
python_operator = PythonOperator(
task_id='test',
python_callable=myfunc,
params = {"my_name":"kimuksung"},
provide_context=True,
dag=dag,
)
# Airflow 2.x
def myfunc(execution_date):
print(execution_date)
python_operator = PythonOperator(task_id="mytask", python_callable=myfunc, dag=dag)
def myfunc(**context):
print(context) # all variables will be provided to context
python_operator = PythonOperator(task_id="mytask", python_callable=myfunc)
concurrency ( 참고 )
Dag Task가 작업하는 resource에 limit을 걸어 한번에 많은 task가 외부 resource와 연결되지 않도록 설정
E.g) update BigQuery table ( 최대 2개 cuncurrency만 허용 ) 시 2개까지만 허용해야한다.
concurrency vs parallelism
concurrency is about **dealing with** lots of things at once but parallelism is about **doing** lots of things at once.
- Environment-level
parallelism
: Worker에서 동시에 실행 가능한 Task Instance 수 제어max_active_tasks_per_dag
: Dag 당 동시에 실행가능한 Task 수max_active_runs_per_dag
: Dag 당 → 한 순간에 가능한 Dag run 수
- Dag-level
Concurrency
: 모든 DAG → Run → 동시에 실행할 수 있는 최대 task 인스턴스 수 ( default = max_active_tasks_per_dag )max_active_tasks
: 하나의 Dag → run → 최대 task instance 수 ( default = max_active_tasks_per_dag )max_active_runs
: Dag runs 수 제한 ( default = max_active_runs_per_dag )
1
2
# Allow a maximum of concurrent 10 tasks across a max of 3 active DAG runs
dag = DAG('my_dag_id', concurrency=10, max_active_runs=3)
- Task-level
- pool : System Resource에서 여러 Task Instance가 한꺼번에 요청을 못내지 않도록 임의의 Task를 하나의 Pool로 정의하여 작업되는 양을 제한 하기 위함
-
task_concurrency = max_ative_tis_per_dag
멱등성
- 간단하게 말하면
동일한 태스크를 여러번 실행되어도 결과가 똑같아야 한다.
- Task는
원자성
을 가져 마지막 결과가 성공,실패(rollback) 처리 되어야 한다. - 데이터를 항상 덮어써야 한다.
**upsert**
- 일반적으로, Task는 Append, Replace 중 하나를 실시하게 만들어야 한다.
- Append → 데이터 중복 가능 / Replace → 반복해도 결과가 변하지 않도록
- Task Parameter를 이용하여 고유의 이름을 생성 → 여러 번 실행되어도 치환이 되어 동일하게
- Request를 날리고 Query나 어떠한 함수가 처리된 후 결과를 던져주었는데, 요청한 쪽이 받지 못하고 다시 요청한 경우 중복 제거 프로토콜을 사용하여 방지
E.g) DB의 Transaction / 결제 데이터에서 유저가 결제를 요청했을 때, 네트워크 이슈로 1회가 아닌 여러번 요청했을 때 결과는 한번만 결제되어야 한다.
Template reference
- jinja template
- Dag 내부에서 사용 할 때는 문자열 내에 사용하면 된다.
- 내부에서 변수 처럼 사용하기 위해서는 별도로 정의
**ds**
- yyyy-mm-dd
**실제 Dag가 작업하는 날짜 기준 전날 시간**
이다
- tomorrow_ds
- 실제 Dag가 작업하는 날짜 기준
- ts
- yyyy-mm-ddThh:mm:ss
1 2 3 4 5 6
# {[ template_variable }} "output/public/mongodb/" # 별도로 정의 EXEC_DATE = '' "output/public/mongodb/"+EXEC_DATE
XCOM
- 참고
- Task 사이에서 데이터를 전달하기 위해 사용
- Variable과 마찬가지로 key-value 형태로 전달.
- 소량의 데이터만 전달하는 것을 권장
- 함수 파라미터에 **context를 넘겨주어야한다.
- context[’task_instance’] 혹은 context[’ti’]로 사용하여 전달 받으면 된다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# PythonOperator return
# return 시 자동으로 push
def return_xcom():
return "return_xcom"
python_operator = PythonOperator(
task_id = 'return_xcom',
python_callable = return_xcom,
dag = dag
)
# Xcom push-pull
# key , task_ids
def xcom_push_test(**context):
xcom_value = "xcom_push_value"
context['task_instance'].xcom_push(key='xcom_push_value', value=xcom_value)
return "xcom_return_value"
def xcom_pull_test(**context):
xcom_return = context["task_instance"].xcom_pull(task_ids='return_xcom')
xcom_push_value = context['ti'].xcom_pull(key='xcom_push_value')
xcom_push_return_value = context['ti'].xcom_pull(task_ids='xcom_push_task')
print("xcom_return : {}".format(xcom_return))
print("xcom_push_value : {}".format(xcom_push_value))
print("xcom_push_return_value : {}".format(xcom_push_return_value))
xcom_push_task = PythonOperator(
task_id = 'xcom_push_task',
python_callable = xcom_push_test,
dag = dag
)
xcom_pull_task = PythonOperator(
task_id = 'xcom_pull_task',
python_callable = xcom_pull_test,
dag = dag
)
# jinja templates
bash_xcom_taskids = BashOperator(
task_id='bash_xcom_taskids',
bash_command='echo ""',
dag=dag
)
bash_xcom_key = BashOperator(
task_id='bash_xcom_key',
bash_command='echo ""',
dag=dag
)
bash_xcom_push = BashOperator(
task_id='bash_xcom_push',
bash_command='echo ""',
dag=dag
)
bash_xcom_pull = BashOperator(
task_id='bash_xcom_pull',
bash_command='echo ""',
dag=dag
)
Branch Operator
- Task를 분기를 태워 동작하도록 만든다.
- Default = all_success ( 참고 )
- Upstream이 모두 성공해야 Task가 실행된다.
-
예시
1 2 3 4 5 6 7
ALL_SUCCESS = 'all_success' ALL_FAILED = 'all_failed' ALL_DONE = 'all_done' # 작업 성공 여부에 관계없이 모두 작동한 경우 ONE_SUCCESS = 'one_success' ONE_FAILED = 'one_failed' DUMMY = 'dummy' NONE_FAILED = 'none_failed'
1
2
3
4
5
6
7
8
from airflow.operators.python import BranchPythonOperator
def random_branch():
from random import randint
return "option_1" if randint(1, 2) == 1 else "option_2"
t2 = BranchPythonOperator(task_id="t2", python_callable=random_branch)
Slack API
SlackAPIPostOperator
는 2.0 버전 이상 부터는 deprecate- Airflow version에 따른 의존성 문제가 발생 할 수 있다. → Python slack Sdk로 대체
1
$ pip install slack_sdk
1
2
3
4
5
# slack api
from slack_sdk import WebClient
client = WebClient(token='SLACK_TOKEN')
response = client.chat_postMessage(channel='#random', text="Hello world!")
1
2
3
4
default_args = {
'owner': '',
'on_failure_callback': alarm
}
참조