Skip to content
Snippets Groups Projects
Commit 9cd73e50 authored by Haziq Abiyyu Mahdy's avatar Haziq Abiyyu Mahdy
Browse files

checkpoint: preprocess

parent 203ddbf6
No related merge requests found
No preview for this file type
File added
No preview for this file type
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime
with DAG(
dag_id='experiment_dag_minio',
schedule_interval=None,
start_date=datetime(2025, 1, 8),
catchup=False,
) as dag:
ingest_training_data_task = SparkSubmitOperator(
task_id='ingest_training_data',
application='/opt/airflow/functions/ingest_training_data.py',
conn_id='spark_default',
name='ingest_training_data_job',
application_args=[
'/opt/workspace/dataset.csv', # Input path
'my-bucket', # MinIO bucket name
'ingested_data.csv', # MinIO object key
],
conf={
'spark.executor.memory': '512m',
},
verbose=True,
)
preprocess_task = SparkSubmitOperator(
task_id='preprocess',
application='/opt/airflow/functions/preprocess.py',
conn_id='spark_default',
name='preprocess_job',
application_args=[
'my-bucket', # MinIO bucket name
'ingested_data.csv', # Input object key
'preprocessed_data.csv', # Output object key
],
conf={
'spark.executor.memory': '512m',
},
verbose=True,
)
ingest_training_data_task >> preprocess_task
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG(
......@@ -11,13 +12,17 @@ with DAG(
spark_submit_task = SparkSubmitOperator(
task_id='submit_spark_job',
application='/opt/workspace/wordcount.py',
application='/opt/airflow/functions/wordcount.py',
conn_id='spark_default',
name='wordcount_job',
application_args=['/opt/workspace/input.txt', '/opt/workspace/output/'],
conf={'spark.executor.memory': '512m'},
conf={
'spark.executor.memory': '512m',
'spark.hadoop.fs.local.block.size': '134217728',
'spark.hadoop.fs.permissions.umask-mode': '000',
},
verbose=True,
)
spark_submit_task
# Define the task sequence
spark_submit_task
\ No newline at end of file
from pyspark.sql import SparkSession
import pyspark.pandas as ps
import boto3
import sys
def ingest_training_data(input_path, bucket_name, object_key):
# Create a Spark session
spark = SparkSession.builder.appName("Data Ingestion").getOrCreate()
# Read the dataset into a pandas-on-Spark DataFrame
df = ps.read_csv(input_path)
# Save the ingested data to a local file
temp_file = "/tmp/ingested_data.csv"
df.to_csv(temp_file, index=False)
# Upload the file to MinIO
s3 = boto3.client(
's3',
endpoint_url='http://minio:9000',
aws_access_key_id='minioadmin',
aws_secret_access_key='minioadmin',
)
s3.upload_file(temp_file, bucket_name, object_key)
print(f"Ingested data uploaded to MinIO: {bucket_name}/{object_key}")
# Stop the Spark session
spark.stop()
if __name__ == "__main__":
input_path = sys.argv[1]
bucket_name = sys.argv[2]
object_key = sys.argv[3]
ingest_training_data(input_path, bucket_name, object_key)
from pyspark.sql import SparkSession
import pyspark.pandas as ps
import boto3
import sys
def preprocess(bucket_name, object_key, output_key):
# Create a Spark session
spark = SparkSession.builder.appName("Data Preprocessing").getOrCreate()
# Download the file from MinIO
temp_file = "/tmp/ingested_data.csv"
s3 = boto3.client(
's3',
endpoint_url='http://minio:9000',
aws_access_key_id='minioadmin',
aws_secret_access_key='minioadmin',
)
s3.download_file(bucket_name, object_key, temp_file)
# Load the data
df = ps.read_csv(temp_file)
# Perform preprocessing
df = df.drop_duplicates()
df = df.dropna()
gender_mapping = {"Male": 0, "Female": 1}
df["gender"] = df["gender"].map(gender_mapping)
binary_mapping = {"No": 0, "Yes": 1}
binary_col = ["Partner", "Dependents", "PhoneService", "PaperlessBilling"]
for col_name in binary_col:
df[col_name] = df[col_name].map(binary_mapping)
categorical_col = [
"MultipleLines",
"InternetService",
"OnlineSecurity",
"OnlineBackup",
"DeviceProtection",
"TechSupport",
"StreamingTV",
"StreamingMovies",
"Contract",
"PaymentMethod",
]
for col_name in categorical_col:
df[col_name] = df[col_name].astype("category").cat.codes
# Save the preprocessed data back to MinIO
preprocessed_file = "/tmp/preprocessed_data.csv"
df.to_csv(preprocessed_file, index=False)
s3.upload_file(preprocessed_file, bucket_name, output_key)
print(f"Preprocessed data uploaded to MinIO: {bucket_name}/{output_key}")
spark.stop()
if __name__ == "__main__":
bucket_name = sys.argv[1]
object_key = sys.argv[2]
output_key = sys.argv[3]
preprocess(bucket_name, object_key, output_key)
......@@ -45,7 +45,7 @@ def main():
# Collect results and save to the output file
word_counts.saveAsTextFile(output_path)
# word_counts.saveAsTextFile(output_path)
print("=============================================================")
print("Word counts saved")
......
......@@ -14,6 +14,7 @@ services:
depends_on:
- spark-master
command: bash -c "rm -f /opt/airflow/airflow-webserver.pid && airflow db init && (airflow scheduler & airflow webserver)"
spark-master:
image: bitnami/spark:latest
container_name: spark-master
......@@ -55,8 +56,24 @@ services:
depends_on:
- spark-master
minio:
image: quay.io/minio/minio:latest
container_name: minio
command: server /data --console-address ":9001"
environment:
- MINIO_ROOT_USER=minioadmin
- MINIO_ROOT_PASSWORD=minioadmin
ports:
- "9000:9000" # MinIO API
- "9001:9001" # MinIO Console
networks:
- spark-cluster
volumes:
- minio-data:/data
volumes:
shared-workspace:
minio-data:
networks:
spark-cluster:
......
This diff is collapsed.
Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum
Sed ut perspiciatis unde omnis iste natus error sit voluptatem accusantium doloremque laudantium, totam rem aperiam, eaque ipsa quae ab illo inventore veritatis et quasi architecto beatae vitae dicta sunt explicabo. Nemo enim ipsam voluptatem quia voluptas sit aspernatur aut odit aut fugit, sed quia consequuntur magni dolores eos qui ratione voluptatem sequi nesciunt. Neque porro quisquam est, qui dolorem ipsum quia dolor sit amet, consectetur, adipisci velit, sed quia non numquam eius modi tempora incidunt ut labore et dolore magnam aliquam quaerat voluptatem. Ut enim ad minima veniam, quis nostrum exercitationem ullam corporis suscipit laboriosam, nisi ut aliquid ex ea commodi consequatur? Quis autem vel eum iure reprehenderit qui in ea voluptate velit esse quam nihil molestiae consequatur, vel illum qui dolorem eum fugiat quo voluptas nulla pariatur?
\ No newline at end of file
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment