From a7969dcd2beb26411a21121f91779d18c0612e8b Mon Sep 17 00:00:00 2001
From: afnanramadhan <13521011@std.stei.itb.ac.id>
Date: Fri, 10 Jan 2025 16:12:25 +0700
Subject: [PATCH] fix: add connection airflow

---
 .gitlab-ci.yml                | 14 ++++++++++++++
 dags/data_drift_simulation.py | 10 +++++-----
 dags/spark_airflow.py         |  2 +-
 docker-compose.yml            | 23 ++++++++++++++++-------
 4 files changed, 36 insertions(+), 13 deletions(-)

diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index c34599d..2a55093 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -1,6 +1,7 @@
 stages:
   - build
   - run
+  - clean-up
 
 build:
   stage: build
@@ -23,3 +24,16 @@ run:
   script:
     - echo "Run Docker Container"
     - docker compose up -d
+
+
+clean-up:
+  stage: clean-up
+  tags:
+    - macos
+  only:
+    - main
+    - develop
+  script:
+    - echo "Clean up "
+    - CONTAINER_ID=$(docker ps -q --filter "name=webserver-1")
+    - docker exec -it $CONTAINER_ID airflow dags trigger train_model
diff --git a/dags/data_drift_simulation.py b/dags/data_drift_simulation.py
index 8b6dc1c..e7f1964 100644
--- a/dags/data_drift_simulation.py
+++ b/dags/data_drift_simulation.py
@@ -31,7 +31,7 @@ def check_accuracy(**kwargs):
     accuracy_data = Variable.get("data_drift_accuracy", deserialize_json=True)  
     accuracy = accuracy_data.get("accuracy", 0)  
     if accuracy < 0.70:  
-        return 'trigger_sparking_flow'  
+        return 'trigger_train_model'  
     else:  
         return 'end'  
   
@@ -42,9 +42,9 @@ branch_task = PythonOperator(
     dag=dag,  
 )  
   
-trigger_sparking_flow = TriggerDagRunOperator(  
-    task_id="trigger_sparking_flow",  
-    trigger_dag_id="sparking_flow",  
+trigger_train_model = TriggerDagRunOperator(  
+    task_id="trigger_train_model",  
+    trigger_dag_id="train_model",  
     dag=dag,  
 )  
   
@@ -55,5 +55,5 @@ end = PythonOperator(
 )    
   
 start >> data_drift_job >> branch_task  
-branch_task >> trigger_sparking_flow >> end  
+branch_task >> trigger_train_model >> end  
 branch_task >> end  
diff --git a/dags/spark_airflow.py b/dags/spark_airflow.py
index 1c2ab23..2fa6e02 100644
--- a/dags/spark_airflow.py
+++ b/dags/spark_airflow.py
@@ -4,7 +4,7 @@ from airflow.operators.python import PythonOperator
 from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
 
 dag = DAG(
-    dag_id = "sparking_flow",
+    dag_id = "train_model",
     default_args = {
         "owner": "admin",
         "start_date": airflow.utils.dates.days_ago(1)
diff --git a/docker-compose.yml b/docker-compose.yml
index daf5b8a..99fccc1 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -40,11 +40,11 @@ services:
     depends_on:
       - spark-master
     environment:
-      SPARK_MODE: worker
-      SPARK_WORKER_CORES: 2
-      SPARK_WORKER_MEMORY: 1g
-      SPARK_MASTER_URL: spark://spark-master:7077
-      SPARK_MASTER: spark://spark-master:7077
+      - SPARK_MODE=worker
+      - SPARK_WORKER_CORES=2
+      - SPARK_WORKER_MEMORY=1g
+      - SPARK_MASTER_URL=spark://spark-master:7077
+      - SPARK_MASTER=spark://spark-master:7077
 
   postgres:
     image: postgres:14.0
@@ -57,7 +57,15 @@ services:
 
   webserver:
     <<: *airflow-common
-    command: bash -c "airflow db init && airflow webserver"
+    command: >
+      bash -c "
+      airflow db init &&
+      if airflow connections get spark_conn; then
+        airflow connections delete spark_conn;
+      fi;
+      airflow connections add 'spark_conn' --conn-type 'spark' --conn-host 'spark://spark-master' --conn-port '7077' --conn-extra '{\"queue\": \"\", \"deploy_mode\": \"client\", \"spark_binary\": \"spark-submit\"}';
+      rm -f /opt/airflow/airflow-webserver.pid;
+      airflow webserver"
     ports:
       - "8080:8080"
     depends_on:
@@ -68,4 +76,5 @@ services:
     command: bash -c "airflow db init && airflow db migrate && airflow users create --username admin --firstname Admin --lastname Admin --role Admin --email admin@example.com --password admin && airflow scheduler"
 
 networks:
-  airflow:
\ No newline at end of file
+  airflow:
+    driver: bridge
\ No newline at end of file
-- 
GitLab