Skip to content
Snippets Groups Projects
Commit 2ca112e3 authored by Muhammad Habibi Husni's avatar Muhammad Habibi Husni
Browse files

feat: preprocess task

parent 8d032477
No related merge requests found
...@@ -22,4 +22,10 @@ logs/ ...@@ -22,4 +22,10 @@ logs/
data/example data/example
.ipynb_checkpoints/ .ipynb_checkpoints/
*.ipynb *.ipynb
\ No newline at end of file
data/clean*.csv
data/train_data*.csv
data/test_data*.csv
.python-version
\ No newline at end of file
# Spark imports
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
from pyspark.sql.types import NumericType from pyspark.sql.types import NumericType, FloatType
from pyspark.sql.functions import udf, col
from pyspark.ml.feature import (
MinMaxScaler,
VectorAssembler,
StringIndexer,
OneHotEncoder,
VectorSlicer
)
from pyspark.ml.functions import vector_to_array
from pyspark.ml import Pipeline
# Other libraries
from imblearn.over_sampling import SMOTE
import pandas as pd
spark = SparkSession.builder \ spark = SparkSession.builder \
.appName("Preprocessing") \ .appName("Preprocessing") \
...@@ -20,58 +35,109 @@ for col_name, col_type in df.dtypes: ...@@ -20,58 +35,109 @@ for col_name, col_type in df.dtypes:
else: else:
column_categorical.append(col_name) column_categorical.append(col_name)
column_categorical.remove("churn")
# print("Numerical Columns:", column_numerical) # print("Numerical Columns:", column_numerical)
# print("Categorical Columns:", column_categorical) # print("Categorical Columns:", column_categorical)
data_train, data_test = df.randomSplit([0.7, 0.3], seed=42)
from pyspark.ml.feature import StringIndexer, OneHotEncoder indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_index") for col in column_categorical + ["churn"]]
from pyspark.ml import Pipeline index_features = [f"{col}_index" for col in column_categorical]
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_index") for col in column_categorical]
encoders = [ encoders = [
OneHotEncoder(inputCol=f"{col}_index", outputCol=f"{col}_encoded", dropLast=True) for col in column_categorical OneHotEncoder(inputCol=f"{col}_index", outputCol=f"{col}_encoded", dropLast=True) for col in column_categorical
] ]
pipeline = Pipeline(stages=indexers + encoders) numerical_assembler = VectorAssembler(inputCols=column_numerical, outputCol="numerical_features")
scaler = MinMaxScaler(inputCol="numerical_features", outputCol="scaled_numerical_features")
slicers = [
VectorSlicer(inputCol="scaled_numerical_features", outputCol=f"scaled_{col}", indices=[i])
for i, col in enumerate(column_numerical)
]
pipeline = Pipeline(stages=indexers + encoders + [numerical_assembler, scaler] + slicers)
pipeline_fit = pipeline.fit(data_train)
df_encoded = pipeline.fit(df).transform(df) data_train_processed = pipeline_fit.transform(data_train)
data_test_processed = pipeline_fit.transform(data_test)
# df_encoded.show(5) def vector_to_float(vector):
return float(vector[0]) if vector else None
vector_to_float_udf = udf(vector_to_float, FloatType())
def postprocess_data(data):
encoded_features = []
for col_name in column_categorical:
encoded_col = f"{col_name}_encoded"
# Convert SparseVector to Array
data = data.withColumn(f"{encoded_col}_array", vector_to_array(col(encoded_col)))
# Explode array into individual columns
size = data.select(f"{encoded_col}_array").first()[0] # Get the size of the dense vector
for i in range(len(size)):
new_col_name = f"{col_name}_category_{i}"
data = data.withColumn(new_col_name, col(f"{encoded_col}_array")[i])
encoded_features.append(f"{col_name}_category_{i}")
for col_name in column_numerical:
data = data.withColumn(f"scaled_{col_name}_tmp", vector_to_float_udf(f"scaled_{col_name}"))
data = data.drop(f"scaled_{col_name}").withColumnRenamed(f"scaled_{col_name}_tmp", f"scaled_{col_name}")
from pyspark.ml.feature import MinMaxScaler scaled_features = [f"scaled_{col}" for col in column_numerical]
from pyspark.ml.feature import VectorAssembler all_columns = scaled_features + encoded_features + ["churn_index"]
data = data.select(*all_columns).withColumnRenamed("churn_index", "churn")
return data
assembler = VectorAssembler(inputCols=column_numerical, outputCol="features") # Postprocess the training data
df_with_features = assembler.transform(df_encoded) data_train_processed = postprocess_data(data_train_processed)
data_test_processed = postprocess_data(data_test_processed)
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
scaling_pipeline = Pipeline(stages=[assembler, scaler]) # data_train_processed.groupBy('churn').count().show()
df_scaled = scaling_pipeline.fit(df_encoded).transform(df_encoded) # oversampling train data
pandas_df_train = data_train_processed.toPandas()
X_train = pandas_df_train.drop(columns=["churn"])
Y_train = pandas_df_train["churn"]
smote = SMOTE(random_state=1)
X_train_smote, y_train_smote = smote.fit_resample(X_train, Y_train)
X_train_smote_df = pd.DataFrame(X_train_smote, columns=X_train.columns)
y_train_smote_df = pd.DataFrame(y_train_smote, columns=['churn'])
data_smote = pd.concat([X_train_smote_df, y_train_smote_df], axis=1)
data_train_processed = spark.createDataFrame(data_smote)
# df_scaled.show(5)
data_train, data_test = df_scaled.randomSplit([0.7, 0.3], seed=42) # data_train_processed.groupBy('churn').count().show()
# data_train_processed.show(5)
# print(f"Training data count: {data_train.count()}") # output to csv file
# print(f"Testing data count: {data_test.count()}") import shutil
import os
# data_train.show(5) def output_data(data, file_name):
output_dir = "/opt/data/" + file_name
# major_df = data_train.filter(col("churn_index") == 0) if os.path.exists(output_dir):
# minor_df = data_train.filter(col("churn_index") == 1) shutil.rmtree(output_dir)
# minority_count = minor_df.count() data.coalesce(1).write.option("header", "true").csv(output_dir)
# major_df_oversampled = major_df.sample(withReplacement=True, fraction=(minority_count / major_df.count())) part_file = os.path.join(output_dir, "part-00000-*.csv")
# data_train_balanced = major_df_oversampled.union(minor_df) final_file = os.path.join("/opt/data/", f"{file_name}.csv")
# # data_train_balanced.show(5) for file in os.listdir(output_dir):
if file.startswith("part-"):
shutil.move(os.path.join(output_dir, file), final_file)
shutil.rmtree(output_dir)
# df_final = data_train_balanced.select("scaled_features", "churn_encoded") output_data(data_train_processed, "train_data")
# df_final.show(5) output_data(data_test_processed, "test_data")
\ No newline at end of file
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment