Skip to content
Snippets Groups Projects
Commit 0eb6fb6d authored by Naufal-Nalendra's avatar Naufal-Nalendra
Browse files

feat: create model dag

parent cf619bc8
No related merge requests found
Pipeline #66354 canceled with stage
model/mlruns model/mlruns
model/*.db model/*.db
dags/_pycache_ logs/
\ No newline at end of file
dags/_pycache_/
*.cpython-311.pyc
/dags/__pycache__
dags/__pycache__/spark_dag.cpython-311.pyc
...@@ -10,4 +10,9 @@ ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64 ...@@ -10,4 +10,9 @@ ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64
USER airflow USER airflow
RUN pip install apache-airflow-providers-apache-spark==2.1.1 pyspark==3.5.0 apache-airflow-providers-openlineage>=1.8.0 RUN pip install apache-airflow-providers-apache-spark==2.1.1 \
pyspark==3.5.0 \
apache-airflow-providers-openlineage>=1.8.0 \
pandas \
mlflow \
scikit-learn
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.ensemble import VotingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import mlflow
import mlflow.sklearn
import os
# Default arguments for the DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Define the DAG
with DAG(
'model_training_and_tracking',
default_args=default_args,
description='Train and log a model using MLflow',
schedule_interval=None, # Run manually
start_date=days_ago(1),
catchup=False,
) as dag:
def train_and_log_model():
# Load data
data_path = os.getenv("DATA_PATH", "../data/churn.csv")
data = pd.read_csv(data_path)
# Split into features and target
X = data.drop(columns=['Churn'])
y = data['Churn'].map({'Yes': 1, 'No': 0})
# Split into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Define individual models
logistic_model = LogisticRegression(max_iter=1000, random_state=42)
random_forest_model = RandomForestClassifier(n_estimators=100, random_state=42)
gradient_boosting_model = GradientBoostingClassifier(n_estimators=100, random_state=42)
# Ensemble model
ensemble_model = VotingClassifier(
estimators=[
('logistic', logistic_model),
('random_forest', random_forest_model),
('gradient_boosting', gradient_boosting_model)
],
voting='soft'
)
# Set up MLflow
mlflow.set_tracking_uri("http://127.0.0.1:5000")
mlflow.set_experiment("Customer Churn Model")
# Start MLflow run
with mlflow.start_run():
# Train the model
ensemble_model.fit(X_train, y_train)
# Make predictions
y_pred = ensemble_model.predict(X_test)
# Evaluate model performance
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
# Log parameters
mlflow.log_param("n_estimators", 100)
mlflow.log_param("random_state", 42)
# Log metrics
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("precision", precision)
mlflow.log_metric("recall", recall)
mlflow.log_metric("f1_score", f1)
# Log the model
mlflow.sklearn.log_model(ensemble_model, "ensemble_model")
print("Model training and logging complete.")
print(f"Metrics - Accuracy: {accuracy}, Precision: {precision}, Recall: {recall}, F1 Score: {f1}")
# Define the task
train_and_log_task = PythonOperator(
task_id='train_and_log_model',
python_callable=train_and_log_model
)
train_and_log_task
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment