diff --git a/dags/model_training.py b/dags/model_training.py index e1cf713cd1a833439bf17fef29cb44338998ea44..f98bab4c16c040746c9e33054709c1085c067f6d 100644 --- a/dags/model_training.py +++ b/dags/model_training.py @@ -1,55 +1,72 @@ -import mlflow -import mlflow.sklearn -from pyspark.sql import SparkSession -from pyspark.ml.feature import VectorAssembler -from pyspark.ml.classification import LogisticRegression -from pyspark.ml import Pipeline -from pyspark.ml.feature import StringIndexer - -def train_model(): - spark = SparkSession.builder.appName("CustomerChurn").getOrCreate() - - # Baca data - cleaned_data = spark.read.csv("telco_customer_churn_transformed.csv", header=True, inferSchema=True) - - # Menampilkan data untuk memastikan format - cleaned_data.show() - - # Tentukan kolom string dan kolom numerik - categorical_columns = ["gender", "Partner", "Dependents", "PhoneService", "MultipleLines", - "InternetService", "OnlineSecurity", "OnlineBackup", "DeviceProtection", - "TechSupport", "StreamingTV", "StreamingMovies", "Contract", - "PaperlessBilling", "PaymentMethod"] - numeric_columns = ["SeniorCitizen", "tenure", "MonthlyCharges", "TotalCharges"] - - # Konversi target label 'Churn' menjadi numerik - label_indexer = StringIndexer(inputCol="Churn", outputCol="label") - indexed_data = label_indexer.fit(cleaned_data).transform(cleaned_data) - - # Konversi kolom string menjadi indeks numerik menggunakan StringIndexer - indexers = [ - StringIndexer(inputCol=col, outputCol=col + "_indexed").setHandleInvalid("skip") - for col in categorical_columns - ] - - # Assemble kolom fitur (numerik dan hasil StringIndexer) menjadi satu kolom `features` - feature_columns = [col + "_indexed" for col in categorical_columns] + numeric_columns - assembler = VectorAssembler(inputCols=feature_columns, outputCol="features") - - # Model Logistic Regression - lr = LogisticRegression(labelCol="label", featuresCol="features") - - # Pipeline untuk melakukan transformasi data dan training model - pipeline = Pipeline(stages=indexers + [assembler, lr]) - - # Training model - model = pipeline.fit(indexed_data) - - # Simpan model - model.write().overwrite().save("logistic_regression_model") - - print("Model berhasil dilatih dan disimpan!") - -if __name__ == "__main__": - train_model() - +import mlflow +import mlflow.spark +from pyspark.sql import SparkSession +from pyspark.ml.feature import VectorAssembler +from pyspark.ml.classification import LogisticRegression +from pyspark.ml import Pipeline +from pyspark.ml.feature import StringIndexer + +def train_model(): + # Set tracking URI to the MLflow server + mlflow.set_tracking_uri('http://localhost:5000') + + # Mulai pelacakan MLflow + mlflow.start_run() + + spark = SparkSession.builder.appName("CustomerChurn").getOrCreate() + + # Baca data + cleaned_data = spark.read.csv("telco_customer_churn_transformed.csv", header=True, inferSchema=True) + + # Menampilkan data untuk memastikan format + cleaned_data.show() + + # Tentukan kolom string dan kolom numerik + categorical_columns = ["gender", "Partner", "Dependents", "PhoneService", "MultipleLines", + "InternetService", "OnlineSecurity", "OnlineBackup", "DeviceProtection", + "TechSupport", "StreamingTV", "StreamingMovies", "Contract", + "PaperlessBilling", "PaymentMethod"] + numeric_columns = ["SeniorCitizen", "tenure", "MonthlyCharges", "TotalCharges"] + + # Konversi target label 'Churn' menjadi numerik + label_indexer = StringIndexer(inputCol="Churn", outputCol="label") + indexed_data = label_indexer.fit(cleaned_data).transform(cleaned_data) + + # Konversi kolom string menjadi indeks numerik menggunakan StringIndexer + indexers = [ + StringIndexer(inputCol=col, outputCol=col + "_indexed").setHandleInvalid("skip") + for col in categorical_columns + ] + + # Assemble kolom fitur (numerik dan hasil StringIndexer) menjadi satu kolom `features` + feature_columns = [col + "_indexed" for col in categorical_columns] + numeric_columns + assembler = VectorAssembler(inputCols=feature_columns, outputCol="features") + + # Model Logistic Regression + lr = LogisticRegression(labelCol="label", featuresCol="features") + + # Pipeline untuk melakukan transformasi data dan training model + pipeline = Pipeline(stages=indexers + [assembler, lr]) + + # Training model + model = pipeline.fit(indexed_data) + + # Simpan model menggunakan MLflow + mlflow.spark.log_model(model, "logistic_regression_model") + + # Mencatat parameter dan metrik + mlflow.log_param("model_type", "Logistic Regression") + mlflow.log_param("num_features", len(feature_columns)) + + # Jika Anda ingin mencatat metrik, misalnya akurasi, Anda perlu menghitungnya + predictions = model.transform(indexed_data) + accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(predictions.count()) + mlflow.log_metric("accuracy", accuracy) + + print("Model berhasil dilatih dan disimpan!") + + # Akhiri pelacakan MLflow + mlflow.end_run() + +if __name__ == "__main__": + train_model()