Kimuksung
Kimuksung 안녕하세요. 분산처리에 관심이 많은 생각하는 주니어 Data Enginner입니다.

kubernetes Airflow 구성하기3 with. git sync

kubernetes Airflow 구성하기3 with. git sync


Kubernetes Airflow Pod를 Github와 Sync 맞추어보려고 합니다.

GitSync

  • 연동할 github repository를 만들어줍니다.
  • 테스트 용도로 Airflow가 동작 할 수 있는 Test dags를 공식 홈페이지에서 가져왔습니다.
  • https://github.com/Kimuksung/k8s_airflow/blob/main/tutorials.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# tutorials.py
from datetime import datetime, timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
with DAG(
    "tutorial",
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        "depends_on_past": False,
        "email": ["airflow@example.com"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function, # or list of functions
        # 'on_success_callback': some_other_function, # or list of functions
        # 'on_retry_callback': another_function, # or list of functions
        # 'sla_miss_callback': yet_another_function, # or list of functions
        # 'trigger_rule': 'all_success'
    },
    description="A simple tutorial DAG",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id="print_date",
        bash_command="date",
    )

    t2 = BashOperator(
        task_id="sleep",
        depends_on_past=False,
        bash_command="sleep 5",
        retries=3,
    )
    t1.doc_md = dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
    **Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; OR
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = dedent(
        """
    
    """
    )

    t3 = BashOperator(
        task_id="templated",
        depends_on_past=False,
        bash_command=templated_command,
    )

    t1 >> [t2, t3]
Secret 생성

Secret 파일을 생성하여 연결할 Github SSH 파일을 넣어줍니다.

1
2
$ kubectl create secret generic airflow-ssh-git-secret --from-file=gitSshKey=/Users/wclub/.ssh/id_rsa -n airflow
$ kubectl get secrets -n airflow

values.yaml 파일 수정

  • dags → gitsync
  • 연결할 github 값을 넣어줍니다.
    • enabled
    • repo
    • branch
    • subpath
    • sshkeysecret
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
gitSync:
    #enabled: false
    enabled: true
    # git repo clone url
    # ssh example: git@github.com:apache/airflow.git
    # https example: https://github.com/apache/airflow.git
    repo: git@github.com:Kimuksung/k8s_airflow.git
    branch: main
    rev: HEAD
    depth: 1
    # the number of consecutive failures allowed before aborting
    maxFailures: 0
    # subpath within the repo where dags are located
    # should be "" if dags are at repo root
    
    # change this
    subPath: ""
    sshKeySecret: airflow-ssh-git-secret
  • kubernetes cluster update
1
2
3
$ helm upgrade --install airflow apache-airflow/airflow -n airflow -f values.yaml --debug
$ kubectl get svc -n airflow
$ kubectl port-forward svc/airflow-webserver 8080:8080 -n airflow --context kind-airflow-cluster

아래와 같이 Github에서 잘 Sync 하여 반영된 것을 볼 수 있습니다.