Airflow SparkOperator
SparkOperator
1
$ pip install pyspark
Python Operator로 구성하기
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import airflow
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from pyspark.sql import SparkSession, functions
def process_etl_spark():
spark = SparkSession \
.builder \
.appName("Extração Documentos a Pagar") \
.config("spark.jars.packages",
"org.mongodb.spark:mongo-spark-connector_2.12:3.0.0,com.microsoft.sqlserver:mssql-jdbc:8.4.1.jre8") \
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/Financeiro") \
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1:27017/Financeiro") \
.config("spark.driver.maxResultSize", "8g") \
.config("spark.network.timeout", 10000000) \
.config("spark.executor.heartbeatInterval", 10000000) \
.config("spark.storage.blockManagerSlaveTimeoutMs", 10000000) \
.config("spark.executor.memory", "10g") \
.master("spark://192.168.0.1:7077") \
.getOrCreate()
start_time = datetime.now()
df = spark.read.format("jdbc") \
.option("url", "jdbc:sqlserver://127.0.0.1:1433;databaseName=Teste") \
.option("user", 'Teste') \
.option("password", 'teste') \
.option("numPartitions", 100) \
.option("partitionColumn", "Id") \
.option("lowerBound", 1) \
.option("upperBound", 488777675) \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.option("dbtable", "(select Id, DataVencimento AS Vencimento, TipoCod AS CodigoTipoDocumento, cast(recsld as FLOAT) AS Saldo from DocumentoPagar \
where TipoCod in ('200','17') and RecPag = 'A') T") \
.load()
group = df.select("CodigoTipoDocumento", "Vencimento", "Saldo") \
.groupby(["CodigoTipoDocumento", "Vencimento"]).agg(functions.sum("Saldo").alias("Saldo"))
group.write.format("com.mongodb.spark.sql.DefaultSource") \
.mode("overwrite") \
.option("database", "Financeiro") \
.option("collection", "Fact_DocumentoPagar") \
.save()
end_time = datetime.now()
print(start_time)
print(end_time - start_time )
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 10, 23),
'retries': 10
}
start_task = DummyOperator(task_id='start_task', dag=dag)
end_task = DummyOperator(task_id='end_task', dag=dag)
task1 = PythonOperator(
task_id= "pyspark_example",
python_callable= process_etl_spark
dag=dag
)
dag = DAG('spark with python operator', default_args=default_args, schedule_interval='@daily')
start_task >> task1 >> end_task
참조
- https://medium.com/codex/executing-spark-jobs-with-apache-airflow-3596717bbbe3