From 4e5232b16e10b7233fdfd881482415beb435d9e8 Mon Sep 17 00:00:00 2001 From: yansans <66671259+yansans@users.noreply.github.com> Date: Wed, 8 Jan 2025 14:45:51 +0700 Subject: [PATCH] init: airflow --- dags/workflow.py | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 dags/workflow.py diff --git a/dags/workflow.py b/dags/workflow.py new file mode 100644 index 0000000..3eaaaf7 --- /dev/null +++ b/dags/workflow.py @@ -0,0 +1,37 @@ +from airflow import DAG +from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator +from datetime import timedelta + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +dag = DAG( + 'telco_customer_churn_data_pipeline', + default_args=default_args, + description='DAG to clean Telco Customer Churn dataset', + schedule_interval=None, + catchup=False, +) + +spark_clean = SparkSubmitOperator( + task_id='spark_data_cleaning', + application='../spark/clean.py', + name='telco_customer_churn_cleaning', + conn_id='spark_default', + dag=dag, +) + + +spark_preprocess = SparkSubmitOperator( + task_id='spark_data_preprocessing', + application='../spark/preprocess.py', + name='telco_customer_churn_preprocessing', + conn_id='spark_default', + dag=dag, +) + +spark_clean >> spark_preprocess \ No newline at end of file -- GitLab