Skip to content
Snippets Groups Projects
Commit fcbf52ed authored by yansans's avatar yansans
Browse files

init: preprocess

parent 1205736b
Branches
No related merge requests found
from pyspark.sql import SparkSession
from pyspark.sql.types import NumericType
spark = SparkSession.builder \
.appName("Preprocessing") \
......@@ -10,25 +11,67 @@ df = spark.read.csv("/opt/data/clean.csv", header=True, inferSchema=True)
# df.printSchema()
# df.count()
# split data
data_X = df.drop('churn') # Features
data_y = df.select('churn') # Target
column_numerical = []
column_categorical = []
data_train, data_test = df.randomSplit([0.7, 0.3], seed=1)
for col_name, col_type in df.dtypes:
if isinstance(df.schema[col_name].dataType, NumericType):
column_numerical.append(col_name)
else:
column_categorical.append(col_name)
#print(f"Training data count: {data_train.count()}")
#print(f"Testing data count: {data_test.count()}")
# print("Numerical Columns:", column_numerical)
# print("Categorical Columns:", column_categorical)
# encode
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
indexer = StringIndexer(inputCol="churn", outputCol="churn_encoded")
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_index") for col in column_categorical]
indexer_model = indexer.fit(data_train)
data_train_encoded = indexer_model.transform(data_train)
encoders = [
OneHotEncoder(inputCol=f"{col}_index", outputCol=f"{col}_encoded", dropLast=True) for col in column_categorical
]
data_test_encoded = indexer_model.transform(data_test)
pipeline = Pipeline(stages=indexers + encoders)
#data_train_encoded.select("churn", "churn_encoded").show(5)
#data_test_encoded.select("churn", "churn_encoded").show(5)
df_encoded = pipeline.fit(df).transform(df)
# df_encoded.show(5)
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=column_numerical, outputCol="features")
df_with_features = assembler.transform(df_encoded)
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
scaling_pipeline = Pipeline(stages=[assembler, scaler])
df_scaled = scaling_pipeline.fit(df_encoded).transform(df_encoded)
# df_scaled.show(5)
data_train, data_test = df_scaled.randomSplit([0.7, 0.3], seed=42)
# print(f"Training data count: {data_train.count()}")
# print(f"Testing data count: {data_test.count()}")
# data_train.show(5)
major_df = data_train.filter(col("churn_index") == 0)
minor_df = data_train.filter(col("churn_index") == 1)
minority_count = minor_df.count()
major_df_oversampled = major_df.sample(withReplacement=True, fraction=(minority_count / major_df.count()))
data_train_balanced = major_df_oversampled.union(minor_df)
# data_train_balanced.show(5)
df_final = data_train_balanced.select("scaled_features", "churn_encoded")
# df_final.show(5)
\ No newline at end of file
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment