From b4d4aaedb4edc8cccc2208c1a4398018344c4234 Mon Sep 17 00:00:00 2001 From: Agilham <13521118@std.stei.itb.ac.id> Date: Wed, 8 Jan 2025 17:40:31 +0700 Subject: [PATCH] init: setup mlflow --- .gitignore | 3 - Dockerfile => airflow/Dockerfile | 0 dags/ppt.py => airflow/dags/example_dag.py | 0 .../dags/example_spark_dag.py | 4 +- {dags => airflow/dags}/workflow.py | 0 data/{input.csv => example.csv} | 0 docker-compose.yaml | 79 ++++++-- mlflow/Dockerfile | 4 + mlflow/requirements.txt | 4 + .../example_spark_job.py | 0 wait-for-it.sh | 182 ++++++++++++++++++ 11 files changed, 260 insertions(+), 16 deletions(-) rename Dockerfile => airflow/Dockerfile (100%) rename dags/ppt.py => airflow/dags/example_dag.py (100%) rename dags/spark_dag.py => airflow/dags/example_spark_dag.py (82%) rename {dags => airflow/dags}/workflow.py (100%) rename data/{input.csv => example.csv} (100%) create mode 100644 mlflow/Dockerfile create mode 100644 mlflow/requirements.txt rename dags/spark_job.py => spark/example_spark_job.py (100%) create mode 100644 wait-for-it.sh diff --git a/.gitignore b/.gitignore index 271ef1b..ecfb54c 100644 --- a/.gitignore +++ b/.gitignore @@ -17,6 +17,3 @@ __pycache__/ # Ignore logs logs/ *.log - -# Ignore output -data/output/ diff --git a/Dockerfile b/airflow/Dockerfile similarity index 100% rename from Dockerfile rename to airflow/Dockerfile diff --git a/dags/ppt.py b/airflow/dags/example_dag.py similarity index 100% rename from dags/ppt.py rename to airflow/dags/example_dag.py diff --git a/dags/spark_dag.py b/airflow/dags/example_spark_dag.py similarity index 82% rename from dags/spark_dag.py rename to airflow/dags/example_spark_dag.py index 2689a21..2ebac1e 100644 --- a/dags/spark_dag.py +++ b/airflow/dags/example_spark_dag.py @@ -23,7 +23,7 @@ with DAG( run_spark_job = SparkSubmitOperator( task_id='run_spark_job', - application="/opt/airflow/dags/spark_job.py", # Path to the script - conn_id="spark_default", # Connection configured in Airflow + application="/opt/airflow/dags/spark_job.py", + conn_id="spark_default", verbose=True, ) diff --git a/dags/workflow.py b/airflow/dags/workflow.py similarity index 100% rename from dags/workflow.py rename to airflow/dags/workflow.py diff --git a/data/input.csv b/data/example.csv similarity index 100% rename from data/input.csv rename to data/example.csv diff --git a/docker-compose.yaml b/docker-compose.yaml index d025039..bfa9f9f 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -50,7 +50,7 @@ x-airflow-common: # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml # and uncomment the "build" line below, Then run `docker-compose build` to build the images. # image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.10.3} - build: . + build: ./airflow environment: &airflow-common-env AIRFLOW__CORE__EXECUTOR: CeleryExecutor @@ -73,11 +73,9 @@ x-airflow-common: # If you want to use it, outcomment it and replace airflow.cfg with the name of your config file # AIRFLOW_CONFIG: '/opt/airflow/config/airflow.cfg' volumes: - - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags - - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs - - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config - - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins - - ${AIRFLOW_PROJ_DIR:-.}/data:/opt/airflow/data + - ${AIRFLOW_PROJ_DIR:-./airflow}/dags:/opt/airflow/dags + - ${AIRFLOW_PROJ_DIR:-./airflow}/logs:/opt/airflow/logs + - ${AIRFLOW_PROJ_DIR:-./airflow}/plugins:/opt/airflow/plugins user: "${AIRFLOW_UID:-50000}:0" depends_on: &airflow-common-depends-on @@ -255,7 +253,7 @@ services: _PIP_ADDITIONAL_REQUIREMENTS: '' user: "0:0" volumes: - - ${AIRFLOW_PROJ_DIR:-.}:/sources + - ${AIRFLOW_PROJ_DIR:-./airflow}:/sources cli: <<: *airflow-common @@ -306,8 +304,6 @@ services: - SPARK_USER=spark ports: - '8081:8080' - volumes: - - ${AIRFLOW_PROJ_DIR:-.}/data:/opt/airflow/data spark-worker: image: docker.io/bitnami/spark:3.5 @@ -322,8 +318,69 @@ services: - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no - SPARK_SSL_ENABLED=no - SPARK_USER=spark + + minio: + restart: always + image: minio/minio@sha256:d051d800a3025588f37f69f132bb5ef718547a9a4ee95ddee44e04ad952a0a96 + container_name: mlflow_s3 + ports: + - "9000:9000" + - "9001:9001" + command: server /data --console-address ':9001' --address ':9000' + environment: + - MINIO_ROOT_USER=${AWS_ACCESS_KEY_ID} + - MINIO_ROOT_PASSWORD=${AWS_SECRET_ACCESS_KEY} volumes: - - ${AIRFLOW_PROJ_DIR:-.}/data:/opt/airflow/data + - minio_data:/data + + mc: + image: minio/mc@sha256:4ccb0c7437bda19ed232603eab10cc246c8f9ac7ccd282943ebb7657a2df4820 + depends_on: + - minio + container_name: mc + env_file: + - .env + entrypoint: > + /bin/sh -c " + /tmp/wait-for-it.sh minio:9000 && + /usr/bin/mc alias set minio http://minio:9000 ${AWS_ACCESS_KEY_ID} ${AWS_SECRET_ACCESS_KEY} && + /usr/bin/mc mb minio/mlflow; + exit 0; + " + volumes: + - ./wait-for-it.sh:/tmp/wait-for-it.sh + + db: + restart: always + image: mysql/mysql-server@sha256:5b40d96b11333570143d98d3a74100fefadb9abb17b27a95dbc9ad33544ec142 + container_name: mlflow_db + ports: + - "3306:3306" + environment: + - MYSQL_DATABASE=${MYSQL_DATABASE} + - MYSQL_USER=${MYSQL_USER} + - MYSQL_PASSWORD=${MYSQL_PASSWORD} + - MYSQL_ROOT_PASSWORD=${MYSQL_ROOT_PASSWORD} + volumes: + - dbdata:/var/lib/mysql + + web: + restart: always + build: ./mlflow + image: mlflow_server + container_name: mlflow_server + depends_on: + - mc + - db + ports: + - "5000:5000" + environment: + - MLFLOW_S3_ENDPOINT_URL=http://minio:9000 + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + command: mlflow server --backend-store-uri mysql+pymysql://${MYSQL_USER}:${MYSQL_PASSWORD}@db:3306/${MYSQL_DATABASE} --default-artifact-root s3://mlflow/ --host 0.0.0.0 volumes: - postgres: \ No newline at end of file + postgres: + dbdata: + minio_data: diff --git a/mlflow/Dockerfile b/mlflow/Dockerfile new file mode 100644 index 0000000..ae8109a --- /dev/null +++ b/mlflow/Dockerfile @@ -0,0 +1,4 @@ +FROM python:3.11-slim-buster +# Install python packages +COPY requirements.txt /tmp +RUN pip install -r /tmp/requirements.txt diff --git a/mlflow/requirements.txt b/mlflow/requirements.txt new file mode 100644 index 0000000..4cf5258 --- /dev/null +++ b/mlflow/requirements.txt @@ -0,0 +1,4 @@ +cryptography==43.0.3 +boto3==1.35.63 +mlflow==2.18.0 +pymysql==1.1.1 diff --git a/dags/spark_job.py b/spark/example_spark_job.py similarity index 100% rename from dags/spark_job.py rename to spark/example_spark_job.py diff --git a/wait-for-it.sh b/wait-for-it.sh new file mode 100644 index 0000000..3974640 --- /dev/null +++ b/wait-for-it.sh @@ -0,0 +1,182 @@ +#!/usr/bin/env bash +# Use this script to test if a given TCP host/port are available + +WAITFORIT_cmdname=${0##*/} + +echoerr() { if [[ $WAITFORIT_QUIET -ne 1 ]]; then echo "$@" 1>&2; fi } + +usage() +{ + cat << USAGE >&2 +Usage: + $WAITFORIT_cmdname host:port [-s] [-t timeout] [-- command args] + -h HOST | --host=HOST Host or IP under test + -p PORT | --port=PORT TCP port under test + Alternatively, you specify the host and port as host:port + -s | --strict Only execute subcommand if the test succeeds + -q | --quiet Don't output any status messages + -t TIMEOUT | --timeout=TIMEOUT + Timeout in seconds, zero for no timeout + -- COMMAND ARGS Execute command with args after the test finishes +USAGE + exit 1 +} + +wait_for() +{ + if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then + echoerr "$WAITFORIT_cmdname: waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT" + else + echoerr "$WAITFORIT_cmdname: waiting for $WAITFORIT_HOST:$WAITFORIT_PORT without a timeout" + fi + WAITFORIT_start_ts=$(date +%s) + while : + do + if [[ $WAITFORIT_ISBUSY -eq 1 ]]; then + nc -z $WAITFORIT_HOST $WAITFORIT_PORT + WAITFORIT_result=$? + else + (echo -n > /dev/tcp/$WAITFORIT_HOST/$WAITFORIT_PORT) >/dev/null 2>&1 + WAITFORIT_result=$? + fi + if [[ $WAITFORIT_result -eq 0 ]]; then + WAITFORIT_end_ts=$(date +%s) + echoerr "$WAITFORIT_cmdname: $WAITFORIT_HOST:$WAITFORIT_PORT is available after $((WAITFORIT_end_ts - WAITFORIT_start_ts)) seconds" + break + fi + sleep 1 + done + return $WAITFORIT_result +} + +wait_for_wrapper() +{ + # In order to support SIGINT during timeout: http://unix.stackexchange.com/a/57692 + if [[ $WAITFORIT_QUIET -eq 1 ]]; then + timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --quiet --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT & + else + timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT & + fi + WAITFORIT_PID=$! + trap "kill -INT -$WAITFORIT_PID" INT + wait $WAITFORIT_PID + WAITFORIT_RESULT=$? + if [[ $WAITFORIT_RESULT -ne 0 ]]; then + echoerr "$WAITFORIT_cmdname: timeout occurred after waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT" + fi + return $WAITFORIT_RESULT +} + +# process arguments +while [[ $# -gt 0 ]] +do + case "$1" in + *:* ) + WAITFORIT_hostport=(${1//:/ }) + WAITFORIT_HOST=${WAITFORIT_hostport[0]} + WAITFORIT_PORT=${WAITFORIT_hostport[1]} + shift 1 + ;; + --child) + WAITFORIT_CHILD=1 + shift 1 + ;; + -q | --quiet) + WAITFORIT_QUIET=1 + shift 1 + ;; + -s | --strict) + WAITFORIT_STRICT=1 + shift 1 + ;; + -h) + WAITFORIT_HOST="$2" + if [[ $WAITFORIT_HOST == "" ]]; then break; fi + shift 2 + ;; + --host=*) + WAITFORIT_HOST="${1#*=}" + shift 1 + ;; + -p) + WAITFORIT_PORT="$2" + if [[ $WAITFORIT_PORT == "" ]]; then break; fi + shift 2 + ;; + --port=*) + WAITFORIT_PORT="${1#*=}" + shift 1 + ;; + -t) + WAITFORIT_TIMEOUT="$2" + if [[ $WAITFORIT_TIMEOUT == "" ]]; then break; fi + shift 2 + ;; + --timeout=*) + WAITFORIT_TIMEOUT="${1#*=}" + shift 1 + ;; + --) + shift + WAITFORIT_CLI=("$@") + break + ;; + --help) + usage + ;; + *) + echoerr "Unknown argument: $1" + usage + ;; + esac +done + +if [[ "$WAITFORIT_HOST" == "" || "$WAITFORIT_PORT" == "" ]]; then + echoerr "Error: you need to provide a host and port to test." + usage +fi + +WAITFORIT_TIMEOUT=${WAITFORIT_TIMEOUT:-15} +WAITFORIT_STRICT=${WAITFORIT_STRICT:-0} +WAITFORIT_CHILD=${WAITFORIT_CHILD:-0} +WAITFORIT_QUIET=${WAITFORIT_QUIET:-0} + +# Check to see if timeout is from busybox? +WAITFORIT_TIMEOUT_PATH=$(type -p timeout) +WAITFORIT_TIMEOUT_PATH=$(realpath $WAITFORIT_TIMEOUT_PATH 2>/dev/null || readlink -f $WAITFORIT_TIMEOUT_PATH) + +WAITFORIT_BUSYTIMEFLAG="" +if [[ $WAITFORIT_TIMEOUT_PATH =~ "busybox" ]]; then + WAITFORIT_ISBUSY=1 + # Check if busybox timeout uses -t flag + # (recent Alpine versions don't support -t anymore) + if timeout &>/dev/stdout | grep -q -e '-t '; then + WAITFORIT_BUSYTIMEFLAG="-t" + fi +else + WAITFORIT_ISBUSY=0 +fi + +if [[ $WAITFORIT_CHILD -gt 0 ]]; then + wait_for + WAITFORIT_RESULT=$? + exit $WAITFORIT_RESULT +else + if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then + wait_for_wrapper + WAITFORIT_RESULT=$? + else + wait_for + WAITFORIT_RESULT=$? + fi +fi + +if [[ $WAITFORIT_CLI != "" ]]; then + if [[ $WAITFORIT_RESULT -ne 0 && $WAITFORIT_STRICT -eq 1 ]]; then + echoerr "$WAITFORIT_cmdname: strict mode, refusing to execute subprocess" + exit $WAITFORIT_RESULT + fi + exec "${WAITFORIT_CLI[@]}" +else + exit $WAITFORIT_RESULT +fi \ No newline at end of file -- GitLab