diff --git a/dags/spark_airflow.py b/dags/spark_airflow.py index b36c37004a60496f1b271ceac07cc57584074fae..7eca5d1b5642e6183ac1382fa85aec38da11c83c 100644 --- a/dags/spark_airflow.py +++ b/dags/spark_airflow.py @@ -25,7 +25,12 @@ cleanup_job = SparkSubmitOperator( dag=dag, ) - +logistic_regression_job = SparkSubmitOperator( + task_id="logistic_regression", + conn_id="spark_conn", + application="jobs/python/logistic_regression.py", + dag=dag, +) end = PythonOperator( task_id="end", diff --git a/jobs/python/logistic_regression.py b/jobs/python/logistic_regression.py new file mode 100644 index 0000000000000000000000000000000000000000..697c65bf36ee319a41267e8c77ad4031b96f9db3 --- /dev/null +++ b/jobs/python/logistic_regression.py @@ -0,0 +1,49 @@ +from pyspark.sql import SparkSession +from pyspark.ml.feature import VectorAssembler, StringIndexer +from pyspark.ml.classification import LogisticRegression +from pyspark.ml.evaluation import MulticlassClassificationEvaluator +from pyspark.ml.classification import LogisticRegressionModel +import os +import shutil + +spark = SparkSession.builder.master("local").appName("LogisticRegressionJob").getOrCreate() + +cleaned_data_path = "dataset/cleaned_data.csv" +df = spark.read.csv(cleaned_data_path, header=True, inferSchema=True) + +categorical_columns = ['gender', 'Partner', 'Dependents', 'PhoneService', + 'MultipleLines', 'InternetService', 'OnlineSecurity', + 'OnlineBackup', 'DeviceProtection', 'TechSupport', + 'StreamingTV', 'StreamingMovies', 'Contract', + 'PaperlessBilling', 'PaymentMethod'] + +for column in categorical_columns: + indexer = StringIndexer(inputCol=column, outputCol=column + "_index", handleInvalid="keep") + df = indexer.fit(df).transform(df) + +feature_columns = [col + "_index" for col in categorical_columns] + ['tenure', 'MonthlyCharges', 'TotalCharges'] +assembler = VectorAssembler(inputCols=feature_columns, outputCol='features') +data = assembler.transform(df) + +train_data, test_data = data.randomSplit([0.80, 0.20], seed=135) + +lr = LogisticRegression(featuresCol='features', labelCol='Churn', maxIter=2000) +model = lr.fit(train_data) + +model_path = 'logistic_regression_model' + +if os.path.exists(model_path): + shutil.rmtree(model_path) + +model.save(model_path) + +loaded_model = LogisticRegressionModel.load(model_path) + +predictions = loaded_model.transform(test_data) + +evaluator = MulticlassClassificationEvaluator(labelCol='Churn', predictionCol='prediction', metricName='accuracy') +accuracy = evaluator.evaluate(predictions) + +print(f"Accuracy: {accuracy * 100:.2f}%") + +spark.stop()