diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index c34599d36a4ff68547981aa7d202e15540a97f92..2a55093d37a3beba98da68b5152ab89bac29a394 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,6 +1,7 @@ stages: - build - run + - clean-up build: stage: build @@ -23,3 +24,16 @@ run: script: - echo "Run Docker Container" - docker compose up -d + + +clean-up: + stage: clean-up + tags: + - macos + only: + - main + - develop + script: + - echo "Clean up " + - CONTAINER_ID=$(docker ps -q --filter "name=webserver-1") + - docker exec -it $CONTAINER_ID airflow dags trigger train_model diff --git a/dags/data_drift_simulation.py b/dags/data_drift_simulation.py index 8b6dc1cd80a73a1895740acf77f2d0468f9f5363..e7f1964e39be4b98a7e6461a1c45f37ea0b56e5e 100644 --- a/dags/data_drift_simulation.py +++ b/dags/data_drift_simulation.py @@ -31,7 +31,7 @@ def check_accuracy(**kwargs): accuracy_data = Variable.get("data_drift_accuracy", deserialize_json=True) accuracy = accuracy_data.get("accuracy", 0) if accuracy < 0.70: - return 'trigger_sparking_flow' + return 'trigger_train_model' else: return 'end' @@ -42,9 +42,9 @@ branch_task = PythonOperator( dag=dag, ) -trigger_sparking_flow = TriggerDagRunOperator( - task_id="trigger_sparking_flow", - trigger_dag_id="sparking_flow", +trigger_train_model = TriggerDagRunOperator( + task_id="trigger_train_model", + trigger_dag_id="train_model", dag=dag, ) @@ -55,5 +55,5 @@ end = PythonOperator( ) start >> data_drift_job >> branch_task -branch_task >> trigger_sparking_flow >> end +branch_task >> trigger_train_model >> end branch_task >> end diff --git a/dags/spark_airflow.py b/dags/spark_airflow.py index 1c2ab23fae8bf05f633b73b179e9f5fd9844a7ed..2fa6e02007325de5e022f110e429a49f96ac4c9f 100644 --- a/dags/spark_airflow.py +++ b/dags/spark_airflow.py @@ -4,7 +4,7 @@ from airflow.operators.python import PythonOperator from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator dag = DAG( - dag_id = "sparking_flow", + dag_id = "train_model", default_args = { "owner": "admin", "start_date": airflow.utils.dates.days_ago(1) diff --git a/docker-compose.yml b/docker-compose.yml index daf5b8a82367dc7b5646124f613ad39879b39728..99fccc190503ded1f0b50ac4b474a7ba7a8abce4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -40,11 +40,11 @@ services: depends_on: - spark-master environment: - SPARK_MODE: worker - SPARK_WORKER_CORES: 2 - SPARK_WORKER_MEMORY: 1g - SPARK_MASTER_URL: spark://spark-master:7077 - SPARK_MASTER: spark://spark-master:7077 + - SPARK_MODE=worker + - SPARK_WORKER_CORES=2 + - SPARK_WORKER_MEMORY=1g + - SPARK_MASTER_URL=spark://spark-master:7077 + - SPARK_MASTER=spark://spark-master:7077 postgres: image: postgres:14.0 @@ -57,7 +57,15 @@ services: webserver: <<: *airflow-common - command: bash -c "airflow db init && airflow webserver" + command: > + bash -c " + airflow db init && + if airflow connections get spark_conn; then + airflow connections delete spark_conn; + fi; + airflow connections add 'spark_conn' --conn-type 'spark' --conn-host 'spark://spark-master' --conn-port '7077' --conn-extra '{\"queue\": \"\", \"deploy_mode\": \"client\", \"spark_binary\": \"spark-submit\"}'; + rm -f /opt/airflow/airflow-webserver.pid; + airflow webserver" ports: - "8080:8080" depends_on: @@ -68,4 +76,5 @@ services: command: bash -c "airflow db init && airflow db migrate && airflow users create --username admin --firstname Admin --lastname Admin --role Admin --email admin@example.com --password admin && airflow scheduler" networks: - airflow: \ No newline at end of file + airflow: + driver: bridge \ No newline at end of file