kubernetes Airflow 구성하기3 with. git sync
Kubernetes Airflow Pod를 Github와 Sync 맞추어보려고 합니다.
GitSync
- 연동할 github repository를 만들어줍니다.
- 테스트 용도로 Airflow가 동작 할 수 있는 Test dags를 공식 홈페이지에서 가져왔습니다.
- https://github.com/Kimuksung/k8s_airflow/blob/main/tutorials.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# tutorials.py
from datetime import datetime, timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
with DAG(
"tutorial",
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'trigger_rule': 'all_success'
},
description="A simple tutorial DAG",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
t1.doc_md = dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
templated_command = dedent(
"""
"""
)
t3 = BashOperator(
task_id="templated",
depends_on_past=False,
bash_command=templated_command,
)
t1 >> [t2, t3]
Secret 생성
Secret 파일을 생성하여 연결할 Github SSH 파일을 넣어줍니다.
1
2
$ kubectl create secret generic airflow-ssh-git-secret --from-file=gitSshKey=/Users/wclub/.ssh/id_rsa -n airflow
$ kubectl get secrets -n airflow
values.yaml 파일 수정
- dags → gitsync
- 연결할 github 값을 넣어줍니다.
- enabled
- repo
- branch
- subpath
- sshkeysecret
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
gitSync:
#enabled: false
enabled: true
# git repo clone url
# ssh example: git@github.com:apache/airflow.git
# https example: https://github.com/apache/airflow.git
repo: git@github.com:Kimuksung/k8s_airflow.git
branch: main
rev: HEAD
depth: 1
# the number of consecutive failures allowed before aborting
maxFailures: 0
# subpath within the repo where dags are located
# should be "" if dags are at repo root
# change this
subPath: ""
sshKeySecret: airflow-ssh-git-secret
- kubernetes cluster update
1
2
3
$ helm upgrade --install airflow apache-airflow/airflow -n airflow -f values.yaml --debug
$ kubectl get svc -n airflow
$ kubectl port-forward svc/airflow-webserver 8080:8080 -n airflow --context kind-airflow-cluster
아래와 같이 Github에서 잘 Sync 하여 반영된 것을 볼 수 있습니다.