diff --git a/dags/workflow.py b/dags/workflow.py new file mode 100644 index 0000000000000000000000000000000000000000..3eaaaf77b589b0053d982e583037410c0087b3bd --- /dev/null +++ b/dags/workflow.py @@ -0,0 +1,37 @@ +from airflow import DAG +from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator +from datetime import timedelta + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +dag = DAG( + 'telco_customer_churn_data_pipeline', + default_args=default_args, + description='DAG to clean Telco Customer Churn dataset', + schedule_interval=None, + catchup=False, +) + +spark_clean = SparkSubmitOperator( + task_id='spark_data_cleaning', + application='../spark/clean.py', + name='telco_customer_churn_cleaning', + conn_id='spark_default', + dag=dag, +) + + +spark_preprocess = SparkSubmitOperator( + task_id='spark_data_preprocessing', + application='../spark/preprocess.py', + name='telco_customer_churn_preprocessing', + conn_id='spark_default', + dag=dag, +) + +spark_clean >> spark_preprocess \ No newline at end of file