diff --git a/jobs/python/datacleanup.py b/jobs/python/datacleanup.py index 8b5ca0a0a51482eb25061f2374bce98fd41a3c28..c924c1f8ca9ae618ad619ba96521745501795950 100644 --- a/jobs/python/datacleanup.py +++ b/jobs/python/datacleanup.py @@ -28,26 +28,18 @@ schema = StructType([ StructField("TotalCharges", StringType(), True), StructField("Churn", StringType(), True) ]) -with open('dataset/telco_customer_churn.csv', 'r') as file: - reader = csv.reader(file) - header = next(reader) - data = [row for row in reader] -df = spark.createDataFrame(data, schema=schema) +raw_data_path = 'dataset/telco_customer_churn.csv' +df = spark.read.csv(raw_data_path, header=True, schema=schema) - -missing_values = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]) - -df = df.withColumn('Churn', trim(col('Churn'))) -df = df.withColumn('Churn', when(col('Churn') == 'Yes', 1).otherwise(0)) - -df = df.withColumn('TotalCharges', col('TotalCharges').cast(DoubleType())) -df.describe('TotalCharges').show() - -df = df.dropna(subset=['TotalCharges']) - -with open('dataset/cleaned_data.csv', 'w') as file: - writer = csv.writer(file) - writer.writerow(header) - for row in df.collect(): - writer.writerow(row) \ No newline at end of file +missing_values = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]) + +df = df.withColumn('Churn', trim(col('Churn'))) +df = df.withColumn('Churn', when(col('Churn') == 'Yes', 1).otherwise(0)) +df = df.withColumn('TotalCharges', col('TotalCharges').cast(DoubleType())) +df = df.dropna(subset=['TotalCharges']) + +cleaned_data_path = 'dataset/cleaned_data' +df.write.csv(cleaned_data_path, mode='overwrite', header=True) + +spark.stop() \ No newline at end of file