From 1a448ef58710028ba0fff3f578f93f0cdaef33e9 Mon Sep 17 00:00:00 2001
From: Hidayatullah Wildan Ghaly Buchary <wildanghaly1@gmail.com>
Date: Thu, 9 Jan 2025 19:55:50 +0700
Subject: [PATCH] feat: add logistic regression job code

---
 dags/spark_airflow.py              |  7 ++++-
 jobs/python/logistic_regression.py | 49 ++++++++++++++++++++++++++++++
 2 files changed, 55 insertions(+), 1 deletion(-)
 create mode 100644 jobs/python/logistic_regression.py

diff --git a/dags/spark_airflow.py b/dags/spark_airflow.py
index b36c370..7eca5d1 100644
--- a/dags/spark_airflow.py
+++ b/dags/spark_airflow.py
@@ -25,7 +25,12 @@ cleanup_job = SparkSubmitOperator(
     dag=dag,
 )
 
-
+logistic_regression_job = SparkSubmitOperator(
+    task_id="logistic_regression",
+    conn_id="spark_conn",
+    application="jobs/python/logistic_regression.py",
+    dag=dag,
+)
 
 end = PythonOperator(
     task_id="end",
diff --git a/jobs/python/logistic_regression.py b/jobs/python/logistic_regression.py
new file mode 100644
index 0000000..697c65b
--- /dev/null
+++ b/jobs/python/logistic_regression.py
@@ -0,0 +1,49 @@
+from pyspark.sql import SparkSession
+from pyspark.ml.feature import VectorAssembler, StringIndexer
+from pyspark.ml.classification import LogisticRegression
+from pyspark.ml.evaluation import MulticlassClassificationEvaluator
+from pyspark.ml.classification import LogisticRegressionModel
+import os
+import shutil
+
+spark = SparkSession.builder.master("local").appName("LogisticRegressionJob").getOrCreate()
+
+cleaned_data_path = "dataset/cleaned_data.csv"
+df = spark.read.csv(cleaned_data_path, header=True, inferSchema=True)
+
+categorical_columns = ['gender', 'Partner', 'Dependents', 'PhoneService',
+                       'MultipleLines', 'InternetService', 'OnlineSecurity',
+                       'OnlineBackup', 'DeviceProtection', 'TechSupport',
+                       'StreamingTV', 'StreamingMovies', 'Contract',
+                       'PaperlessBilling', 'PaymentMethod']
+
+for column in categorical_columns:
+    indexer = StringIndexer(inputCol=column, outputCol=column + "_index", handleInvalid="keep")
+    df = indexer.fit(df).transform(df)
+
+feature_columns = [col + "_index" for col in categorical_columns] + ['tenure', 'MonthlyCharges', 'TotalCharges']
+assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')
+data = assembler.transform(df)
+
+train_data, test_data = data.randomSplit([0.80, 0.20], seed=135)
+
+lr = LogisticRegression(featuresCol='features', labelCol='Churn', maxIter=2000)
+model = lr.fit(train_data)
+
+model_path = 'logistic_regression_model'
+
+if os.path.exists(model_path):
+    shutil.rmtree(model_path)
+
+model.save(model_path)
+
+loaded_model = LogisticRegressionModel.load(model_path)
+
+predictions = loaded_model.transform(test_data)
+
+evaluator = MulticlassClassificationEvaluator(labelCol='Churn', predictionCol='prediction', metricName='accuracy')
+accuracy = evaluator.evaluate(predictions)
+
+print(f"Accuracy: {accuracy * 100:.2f}%")
+
+spark.stop()
-- 
GitLab