Kimuksung
Kimuksung 안녕하세요. 분산처리에 관심이 많은 생각하는 주니어 Data Enginner입니다.

Airflow Sensor

Airflow Sensor

안녕하세요
오늘은 Airflow 스케줄링하며, 특정 주기 동안 조건을 만족했을 때 진행하도록 하기 위함
데이터 마트 구축 시 Collection 별로 스케줄링 하는 것이 아닌 순차적으로 진행시키게 하도록 할 때(메모리 이슈로 분할 처리)


정의

  • 특정 주기 동안 조건을 만족한지 확인하는 오퍼레이터
  • BaseSensorOperator를 상속
  • 연속성을 가져 작업을 한번에 처리하는 경우

E.g) 새벽 Datamart 작업의 경우 계속하여 진행 처리하면 된다. 굳이 시간을 5분마다로 두어 동작할 이유가 없다.


Sensor

  • FileSensor
    • 파일이 존재하는지 여부 체크
  • PythonSensor
    • Python_callable func 요청 후 True 응답값을 받을 때 까지 대기
  • ExternalTaskSensor
    • Task가 완료 까지 대기
  • HttpSensor
    • API request가 성공 까지 대기
  • SqlSensor
    • Sql 테이블 존재 까지 대기


Parameter

  • mode
    • Sensor 동작 방식
    • poke
      • Default 방식
      • Sensor 전체 실행동안 worker 슬롯 점유
      • poke 사이에는 sleep 상태 존재
    • reschedule
      • 조건을 만족하지 않았을 때 worker 슬롯 해제
      • 긴 실행시간을 가질 것으로 예상될 때 주로 사용
      • 다른 테스크 실행할 수 있게 해주며, Resource 적게 소모
  • poke_interval
    • poke 방식 사용 시 조건 확인하는 주기
    • Default = 30
  • exponential_backoff
    • True 인 경우 poke 사이의 간격을 지수로 증가
  • timeout
    • Task 확인할 최대 시간
    • 최대시간을 넘으면 실패
  • soft_fail
    • True인 경우 Timeout이 나더라도 실패가 아닌 스킵


결과 및 코드

https://ifh.cc/g/dygAS9.png https://ifh.cc/g/rw4xnd.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# DagA.py
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

def print_execution_date(**kwargs):
    print(kwargs)

default_args = {
    'owner': 'kai',
    'depends_on_past': False,
    'start_date': datetime(2023,1,26),
}

ds = ''
ts = ''

with DAG(
    dag_id='DAG_A', 
    schedule_interval='0 0 * * *',
    default_args=default_args) as dag:

    task_1 = PythonOperator(
        dag=dag,
        task_id='Task_1',
        python_callable=print_execution_date,
        op_kwargs={'ts': ts},
    )
    task_2 = PythonOperator(
        dag=dag,
        task_id='Task_2',
        python_callable=print_execution_date,
        op_kwargs={'ds': ds},
    )
    task_1 >> task_2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#DagB.py
from datetime import datetime, timedelta

from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.python_operator import PythonOperator

def print_execution_date(ds):
    print(ds)

ds = ''
start_date = datetime(2023, 1, 26)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': start_date
}

with DAG(
    dag_id='DAG_B', 
    schedule_interval='0 0 * * *',
    default_args=default_args) as dag:

    sensor = ExternalTaskSensor(
        task_id='wait_for_task_2',
        external_dag_id='DAG_A',
        external_task_id='Task_2',
        start_date=start_date,
        execution_date_fn=lambda x: x,
        mode='reschedule',
        timeout=3600,
    )

    task_3 = PythonOperator(
        dag=dag,
        task_id='Task_3',
        python_callable=print_execution_date,
        op_kwargs={'ds': ds},
    )

    sensor >> task_3