Kimuksung
Kimuksung 안녕하세요. 분산처리에 관심이 많은 생각하는 주니어 Data Enginner입니다.

Data Pipeline with apache airflow Chatper5 (Task dependency)

Data Pipeline with apache airflow Chatper5 (Task dependency)
Task 의존성이란?

Linear chain dependency

  • >> 연산자를 사용하여 의존성

fan-out/fan-in dependency

  • 복잡한 의존성 관계
  • [] 연산자를 사용하여 의존성 표현
  • fan-in = 1 Task → 여러 Upstream Task에 의존
    1
    
      [clean_weather, clean_sales] >> join_datasets
    
  • E.g) (날씨 데이터→날씨 데이터 정제) + (판매 데이터→판매 데이터 정제) → 데이터 세트 구성 → ML 학습 → ML 배포
  • start 이후에 fetch_sales,fetch_weather task가 병렬로 실행

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    
      import airflow
        
      from airflow import DAG
      from airflow.operators.dummy import DummyOperator
        
      start = DummyOperator(task_id="start")
        
      fetch_sales = DummyOperator(task_id="fetch_sales")
      clean_sales = DummyOperator(task_id="clean_sales")
        
      fetch_weather = DummyOperator(task_id="fetch_weather")
      clean_weather = DummyOperator(task_id="clean_weather")
        
      start >> [fetch_sales, fetch_weather]
      fetch_sales >> clean_sales
      fetch_weather >> clean_weather
    


Branch

  • 유사한 테스크의 경우에는 조건절을 활용하여 Task 파이프라인 구성 가능
  • 하지만, 테스크 자체의 용도가 많이 다른 2가지의 Downstream이 구성된다면?
  • 이 과정에서 Task를 명확하게 분리하지 않는다면, Flow를 한번에 알아보는 것은 쉽지 않다.
  • BranchPythonOperator = 작업 결과를 Task id를 반환 → 이를 활용하여 선택 기능 구성
1
2
3
4
5
6
7
8
9
10
11
12
def _pick_erp_system(**context):
    if context["execution_date"] < ERP_CHANGE_DATE:
        return "fetch_sales_old"
    else:
        return "fetch_sales_new"

pick_erp_system = BranchPythonOperator(
  task_id="pick_erp_system", 
	python_callable=_pick_erp_system
)

pick_erp_system >> [fetch_sales_old, fetch_sales_new]
  • 위와 같이 구성하면 분기타서 실행은 되지만, 지정된 Upstream Task가 전부 완료되지 않았기에 Task는 동작하지 않습니다.


Trigger

  • 아래 그림과 같이 하나의 Task에서 분기처리 되어 Flow를 실행하다보면, 앞써 배운바를 통해 join_datasets이 동작하지 않는 것을 볼 수 있습니다.

    https://ifh.cc/g/Vsco3D.png

  • 이를 해결하기 위해서 Task가 시작하기 위한 조건을 설정할 수 있습니다.
  • trigger_rule
    • all_access = 모든 parent task가 성공해야 해당 Task를 실행
    • none_failed = parent task가 실행 완료 및 실패가 없는 경우 실행
    • all_done = 의존성 Task가 완료되는 즉시 실행
    • one_failed, one_success = 하나의 Upstream Task가 실패,성공하면 모든 Task 를 더 이상 동작하고 싶지 않는 경우(eager rules)

      1
      
        join_erp = PythonOperator(task_id="join_erp_branch", trigger_rule="none_failed")
      

조금 더 명확하게 하기 위해서 Branch가 끝난 뒤에 DummyOperator로 완료된 Task를 적용하여 표현한다.

1
2
3
4
5
from airflow.operators.dummy import DummyOperator

join_erp = DummyOperator(task_id="join_erp_branch", trigger_rule="none_failed")
[clean_sales_old, clean_sales_new] >> join_erp_branch
[join_erp_branch, clean_weather] >> join_datasets

https://ifh.cc/g/jXrpZr.png


조건부 태스크

  • 특정 조건에 따라 DAG에서 특정 Task를 건너뛸 수 있는 다른 방법
  • 위 그림에서 deploy_model Task를 보고 모델이 실제로 배포되었는지 알 수 있을까요? → NO
  • AirflowSkipException() = 해당 Task와 모든 다운스트림 태스크를 Skip 처리
1
2
3
4
5
6
7
8
9
def _latest_only(**context):
    now = pendulum.now("UTC")
    left_window = context["dag"].following_schedule(context["execution_date"])
    right_window = context["dag"].following_schedule(left_window)

    if not left_window < now <= right_window:
        raise AirflowSkipException()

latest_only >> deploy_model

https://ifh.cc/g/rpsXkV.png

가장 최근에 실행한 DAG만 실행 할 수 있도록 내부 Operator를 지원하여 줍니다.

LatestOnlyOperator = 조건부 배포를 하기 위해 만들어진 기능

1
2
3
4
from airflow.operators.latest_only import LatestOnlyOperator

latest_only = LatestOnlyOperator(task_id="latest_only", dag=dag)
latest_only >> deploy_model

Task 실행 과정

  • Airflow → DAG 실행 → 지속적으로 Task를 확인 → 실행 가능하다고 판단되면 즉시 스케줄러에 의해 선택 및 예약 → Slot이 남아있으면 즉시 실행

Airflow Task 실행 시기를 어떻게 결정하는가?

  • 의존적인 Task가 모두 성공적으로 처리되어야 한다.
  • Trigger = all_access인 경우에는 의존성이 모두 해결 → 실행 가능 준비 → 다음 DAG 의존성 삭제 → 전체 DAG Task 실행 될 때까지 실행
    • 위 처럼 Propagation = Upstream Task가 DownStream Task에 영향을 줍니다.
    • 의존성 Task들이 실패,스킵됨으로써 의존성이 있는 Task에 영향을 끼친다.
  • Trigger = None_failed 인 경우 Upstream Task 완료 여부만 확인
  • 실패는 Propagation이지만, 스킵은 Propagation X


XCOM

  • Task간 데이터 공유 ( 작은 데이터 )
  • 일반적으로 Message(state)를 교환
  • xcom_push
  • xcom_pull
  • 일부 오퍼레이터는 Xcom 값을 자동으로 게시 ( PythonOperator-Return )

    1
    2
    3
    
      def _train_model(**context):
          model_id = str(uuid.uuid4())
          return model_id
    
1
2
3
4
5
6
7
8
9
10
11
def _train_model(**context):
    model_id = str(uuid.uuid4())
    context["task_instance"].xcom_push(key="model_id", value=model_id)

def _deploy_model(**context):
    model_id = context["task_instance"].xcom_pull(
        task_ids="train_model", key="model_id"
    )
    print(f"Deploying model {model_id}")

train_model >> deploy_model

단점

  • Xcom을 활용하여 값을 사용하게 된다면, Task간 의존성이 나타나지 않아 묵시정 의존성이 나타난다.
  • Operator 원자성을 무너뜨리는 패턴이 발생 할 수 있다.
  • E.g) API Token 발급 후 Xcom을 통하여 전달 - 이 경우 시간이 만료된다면 원자성이 깨진다.
  • 모든 값을 Serialize 하여 처리 → Lambda, Multi process 관련 Class는 불가능
  • Airflow metastore에 저장되며, 사용되는 DB에 따라 크기 제한
    • SQLite - 2GB / Postgresql - 1GB / Mysql - 64KB
  • Airflow 2부터는 커스텀 백엔드에 저장 할 수 있도록 추가
  • BaseXcom class가 상속 처리 + 값을 Serialize,deserialzie 처리 하기 위한 Method를 구현
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from typing import Any
from airflow.models.xcom import BaseXCom
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

class S3XComBackend(BaseXCom):
	 PREFIX = "xcom_s3"
	 BUCKET_NAME = os.environ.get("S3_XCOM_BUCKET_NAME")
		@staticmethod
		def serialize_value(value: Any):
		    if isinstance(value, pd.DataFrame):
		        hook = S3Hook()
		        key = f"{str(uuid.uuid4())}.pickle"
		        filename = f"{key}.pickle"
		        value.to_pickle(filename, index=False)
		        hook.load_file(
		            filename=filename,
		            key=key,
		            bucket_name=S3XComBackend.BUCKET_NAME,
		            replace=True
		        )
		        value = f"{S3XComBackend.PREFIX}://{S3XComBackend.BUCKET_NAME}/{key}"
		
		    return BaseXCom.serialize_value(value)

		@staticmethod
		def deserialize_value(result) -> Any:
		    result = BaseXCom.deserialize_value(result)
		
		    if isinstance(result, str) and result.startswith(S3XComBackend.PREFIX):
		        hook = S3Hook()
		        key = result.replace(f"{S3XComBackend.PREFIX}://{S3XComBackend.BUCKET_NAME}/", "")
		        filename = hook.download_file(
		            key=key,
		            bucket_name=S3XComBackend.BUCKET_NAME,
		            local_path="/tmp"
		        )
		        result = pd.read_csv(filename)
		
		    return result


Taskflow API

  • Task 및 의존성을 정의하기 위해서 decorator 기반 Api 지원
  • @task decorator를 활용하여 변환
  • 앞에서 본것과 같이 Airflow의 Xcom을 전달하기 위해 Task를 일일히 설정 및 구현해줘야 한다.
  • 간단하면서도 확실하게 표현하는 방법
  • 하지만 항상 사용하는 것은 아니다
    • PythonOperator를 사용하여 구현되는 Python Task로 제한
    • 다른 Operator와 연결할 때에는 Task 및 Dependency 정의해야 한다.
    • 혼용하며게 되면 직관적이지 않기 때문에 PythonOperator만 모여있는 경우 사용하는게 좋다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import uuid
import airflow
from airflow import DAG
from airflow.decorators import task

	@task
  def train_model():
      model_id = str(uuid.uuid4())
      return model_id

  @task
  def deploy_model(model_id: str):
      print(f"Deploying model {model_id}")

  model_id = train_model()
  deploy_model(model_id)