From c26ea0f0e1d02776e13f31a0712499a8d469eee7 Mon Sep 17 00:00:00 2001 From: yansans <66671259+yansans@users.noreply.github.com> Date: Wed, 8 Jan 2025 12:00:57 +0700 Subject: [PATCH] feat: spark cleanup --- spark/clean.py | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 spark/clean.py diff --git a/spark/clean.py b/spark/clean.py new file mode 100644 index 0000000..8ab12d4 --- /dev/null +++ b/spark/clean.py @@ -0,0 +1,39 @@ +from pyspark.sql import SparkSession + +spark = SparkSession.builder \ + .appName("TelcoCustomerChurn") \ + .getOrCreate() + +df = spark.read.csv("../data/TelcoCustomerChurn.csv", header=True, inferSchema=True) + +from pyspark.sql.functions import col, when, count, mean, isnan + +df = df.drop("customerID") + +df = df.withColumn("TotalCharges", col("TotalCharges").cast("double")) + +# missing_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]) +# missing_counts.show() + +# tenure_zero_rows = df.filter(col("tenure") == 0) +# tenure_zero_rows.show() + +df = df.fillna({"TotalCharges": 0}) + +# missing_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]) +# missing_counts.show() + +# redundant +columns_to_replace = [ + "MultipleLines", "OnlineSecurity", "OnlineBackup", + "DeviceProtection", "TechSupport", "StreamingTV", "StreamingMovies" +] + +for col_name in columns_to_replace: + df = df.withColumn( + col_name, + when(col(col_name).isin(["No phone service", "No internet service"]), "No") + .otherwise(col(col_name)) + ) + +# df.show() \ No newline at end of file -- GitLab