diff --git a/.gitignore b/.gitignore index 4fd0b266e44ceab9664a96ffbb62d0405f975802..716b8281872d67fd2d5120f066922487c78064c4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .venv/ -logs/ \ No newline at end of file +logs/ +**/__pycache__/ diff --git a/airflow/airflow-webserver.pid b/airflow/airflow-webserver.pid index 2bd5a0a98a36cc08ada88b804d3be047e6aa5b8a..409940768f2a684935a7d15a29f96e82c487f439 100644 --- a/airflow/airflow-webserver.pid +++ b/airflow/airflow-webserver.pid @@ -1 +1 @@ -22 +23 diff --git a/airflow/airflow.db b/airflow/airflow.db index bd96b9c564403f0d4cb01ff659a0d67e9a289703..907a8e95c536a310efee44d72a78275f219ca44d 100644 Binary files a/airflow/airflow.db and b/airflow/airflow.db differ diff --git a/airflow/dags/__pycache__/experiment_dag.cpython-312.pyc b/airflow/dags/__pycache__/experiment_dag.cpython-312.pyc index 20d71eefdb96d206749c569fdfbee039a5daf6b3..0a335d91dc930fb7723e36493a510775ead5c82f 100644 Binary files a/airflow/dags/__pycache__/experiment_dag.cpython-312.pyc and b/airflow/dags/__pycache__/experiment_dag.cpython-312.pyc differ diff --git a/airflow/dags/experiment_dag.py b/airflow/dags/experiment_dag.py index 3b7607688990119438614c0f045dcbc7ef8b35b3..4b80fc1a1a9b6796bba35d826f84b01b05fd3a44 100644 --- a/airflow/dags/experiment_dag.py +++ b/airflow/dags/experiment_dag.py @@ -1,39 +1,35 @@ +from datetime import datetime from airflow import DAG +from airflow.operators.python import PythonOperator from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator -from datetime import datetime +from functions.ingest_clean_data import ingest_clean_data +from functions.generate_filename import generate_filename +from functions.train_model import train_model + +SOURCE_DATASET_FILE = '/opt/workspace/dataset.csv' +DATASET_BUCKET_NAME = 'datasets' with DAG( - dag_id='experiment_dag_minio', + dag_id='experiment_dag', schedule_interval=None, start_date=datetime(2025, 1, 8), catchup=False, ) as dag: - - ingest_training_data_task = SparkSubmitOperator( - task_id='ingest_training_data', - application='/opt/airflow/functions/ingest_training_data.py', - conn_id='spark_default', - name='ingest_training_data_job', - application_args=[ - '/opt/workspace/dataset.csv', # Input path - 'my-bucket', # MinIO bucket name - 'ingested_data.csv', # MinIO object key - ], - conf={ - 'spark.executor.memory': '512m', - }, - verbose=True, + generate_filename_task = PythonOperator( + task_id='generate_filename', + python_callable=generate_filename, + dag=dag ) preprocess_task = SparkSubmitOperator( - task_id='preprocess', - application='/opt/airflow/functions/preprocess.py', + task_id='preprocess_training_data', + application='/opt/airflow/dags/functions/preprocess_training_data.py', conn_id='spark_default', name='preprocess_job', application_args=[ - 'my-bucket', # MinIO bucket name - 'ingested_data.csv', # Input object key - 'preprocessed_data.csv', # Output object key + SOURCE_DATASET_FILE, + DATASET_BUCKET_NAME, + "{{ task_instance.xcom_pull(task_ids='generate_filename', key='filename') }}" ], conf={ 'spark.executor.memory': '512m', @@ -41,4 +37,26 @@ with DAG( verbose=True, ) - ingest_training_data_task >> preprocess_task + ingest_clean_data_task = PythonOperator( + task_id='ingest_clean_data', + python_callable=ingest_clean_data, + op_args=[ + 'datasets', + "{{ task_instance.xcom_pull(task_ids='generate_filename', key='filename') }}" + ], + provide_context=True, + dag=dag + ) + + train_model_task = PythonOperator( + task_id='train_model', + python_callable=train_model, + op_args=[ + "{{ task_instance.xcom_pull(task_ids='ingest_clean_data', key='cleaned_data') }}" + ], + provide_context=True, + dag=dag + ) + + generate_filename_task >> preprocess_task >> ingest_clean_data_task >> train_model_task + diff --git a/airflow/dags/functions/generate_filename.py b/airflow/dags/functions/generate_filename.py new file mode 100644 index 0000000000000000000000000000000000000000..996fd899bd25c725e7b5692b59e535baf8903ea0 --- /dev/null +++ b/airflow/dags/functions/generate_filename.py @@ -0,0 +1,7 @@ +from datetime import datetime + +def generate_filename(**context): + timestamp = datetime.now() + filename = f'{timestamp.isoformat()}.csv' + context['task_instance'].xcom_push(key='filename', value=filename) + return filename \ No newline at end of file diff --git a/airflow/dags/functions/ingest_clean_data.py b/airflow/dags/functions/ingest_clean_data.py new file mode 100644 index 0000000000000000000000000000000000000000..7f18768220c3dbcbe23f4aeed9a6cb59bc04bd45 --- /dev/null +++ b/airflow/dags/functions/ingest_clean_data.py @@ -0,0 +1,39 @@ +import pandas as pd +import boto3 +from io import StringIO + +def ingest_clean_data(bucket_name, object_key, **kwargs): + print("=============================================================") + print("bucket_name: ", bucket_name) + print("object_key: ", object_key) + print("=============================================================") + + print("=============================================================") + print("Create S3 client") + print("=============================================================") + + s3 = boto3.client( + 's3', + endpoint_url='http://minio:9000', + aws_access_key_id='minioadmin', + aws_secret_access_key='minioadmin' + ) + + print("=============================================================") + print("Getting object") + print("=============================================================") + + csv_object = s3.get_object(Bucket=bucket_name, Key=object_key) + csv_data = csv_object['Body'].read().decode('utf-8') + + print("=============================================================") + print("Read csv") + print("=============================================================") + + df = pd.read_csv(StringIO(csv_data)) + + kwargs['ti'].xcom_push(key='cleaned_data', value=df.to_dict()) + + print("=============================================================") + print("Data successfully ingested and placed in XCom.") + print("=============================================================") diff --git a/airflow/dags/functions/preprocess_training_data.py b/airflow/dags/functions/preprocess_training_data.py new file mode 100644 index 0000000000000000000000000000000000000000..382905e28d67591bc4de26bef6fbfd7047a127b8 --- /dev/null +++ b/airflow/dags/functions/preprocess_training_data.py @@ -0,0 +1,100 @@ +from pyspark.sql import SparkSession +import pyspark.pandas as ps +import uuid +import boto3 +import sys + +def ensure_bucket_exists(s3_client, bucket_name): + """Create bucket if it doesn't exist""" + try: + s3_client.head_bucket(Bucket=bucket_name) + except: + try: + s3_client.create_bucket(Bucket=bucket_name) + print(f"Created bucket: {bucket_name}") + except Exception as e: + print(f"Error creating bucket: {str(e)}") + raise + +def preprocess(input_path, bucket_name, output_object_key): + print("=============================================================") + print("input_path: ", input_path) + print("bucket_name: ", bucket_name) + print("output_object_key: ", output_object_key) + print("=============================================================") + + spark = SparkSession.builder.appName("Data Preprocessing").getOrCreate() + + + df = ps.read_csv(input_path) + print("=============================================================") + print("Read csv") + print("=============================================================") + + + df = df.drop_duplicates() + df = df.dropna() + df = df.drop(columns=['customerID']) + + gender_mapping = {"Male": 0, "Female": 1} + df["gender"] = df["gender"].map(gender_mapping) + + binary_mapping = {"No": 0, "Yes": 1} + binary_col = ["Partner", "Dependents", "PhoneService", "PaperlessBilling", "Churn"] + for col_name in binary_col: + df[col_name] = df[col_name].map(binary_mapping) + + categorical_col = [ + "MultipleLines", + "InternetService", + "OnlineSecurity", + "OnlineBackup", + "DeviceProtection", + "TechSupport", + "StreamingTV", + "StreamingMovies", + "Contract", + "PaymentMethod", + ] + for col_name in categorical_col: + df[col_name] = df[col_name].astype("category").cat.codes + + print("=============================================================") + print("Done preprocessing") + print(df.head()) + print("=============================================================") + + pandas_df = df.to_pandas() + temp_file = f"/tmp/preprocessed_data_{uuid.uuid4()}.csv" + pandas_df.to_csv(temp_file, index=False) + + print("=============================================================") + print("Create S3 client") + print("=============================================================") + + s3 = boto3.client( + 's3', + endpoint_url='http://minio:9000', + aws_access_key_id='minioadmin', + aws_secret_access_key='minioadmin', + ) + + ensure_bucket_exists(s3, bucket_name) + + print("=============================================================") + print("Uploading file to S3") + print("=============================================================") + + s3.upload_file(temp_file, bucket_name, output_object_key) + + print("=============================================================") + print("File uploaded to S3") + print("=============================================================") + + spark.stop() + +if __name__ == "__main__": + input_path = sys.argv[1] + bucket_name = sys.argv[2] + output_object_key = sys.argv[3] + preprocess(input_path, bucket_name, output_object_key) \ No newline at end of file diff --git a/airflow/dags/functions/train_model.py b/airflow/dags/functions/train_model.py new file mode 100644 index 0000000000000000000000000000000000000000..c08b5eaec1f964ff9614128798077473c8dcc7c9 --- /dev/null +++ b/airflow/dags/functions/train_model.py @@ -0,0 +1,14 @@ +import pandas as pd +import ast + +def train_model(df_dict_str): + df_dict = ast.literal_eval(df_dict_str) + df = pd.DataFrame(df_dict) + + print("=============================================================") + print(df.head()) + print("=============================================================") + + print("=============================================================") + print("Train model") + print("=============================================================") \ No newline at end of file diff --git a/airflow/functions/ingest_training_data.py b/airflow/functions/ingest_training_data.py deleted file mode 100644 index 45e818004f53c336a91e2f0d247c4630fcda4fae..0000000000000000000000000000000000000000 --- a/airflow/functions/ingest_training_data.py +++ /dev/null @@ -1,34 +0,0 @@ -from pyspark.sql import SparkSession -import pyspark.pandas as ps -import boto3 -import sys - -def ingest_training_data(input_path, bucket_name, object_key): - # Create a Spark session - spark = SparkSession.builder.appName("Data Ingestion").getOrCreate() - - # Read the dataset into a pandas-on-Spark DataFrame - df = ps.read_csv(input_path) - - # Save the ingested data to a local file - temp_file = "/tmp/ingested_data.csv" - df.to_csv(temp_file, index=False) - - # Upload the file to MinIO - s3 = boto3.client( - 's3', - endpoint_url='http://minio:9000', - aws_access_key_id='minioadmin', - aws_secret_access_key='minioadmin', - ) - s3.upload_file(temp_file, bucket_name, object_key) - print(f"Ingested data uploaded to MinIO: {bucket_name}/{object_key}") - - # Stop the Spark session - spark.stop() - -if __name__ == "__main__": - input_path = sys.argv[1] - bucket_name = sys.argv[2] - object_key = sys.argv[3] - ingest_training_data(input_path, bucket_name, object_key) diff --git a/airflow/functions/preprocess.py b/airflow/functions/preprocess.py deleted file mode 100644 index c6bf7d29344b0a3a317671a0c182d8a4a8235b45..0000000000000000000000000000000000000000 --- a/airflow/functions/preprocess.py +++ /dev/null @@ -1,62 +0,0 @@ -from pyspark.sql import SparkSession -import pyspark.pandas as ps -import boto3 -import sys - -def preprocess(bucket_name, object_key, output_key): - # Create a Spark session - spark = SparkSession.builder.appName("Data Preprocessing").getOrCreate() - - # Download the file from MinIO - temp_file = "/tmp/ingested_data.csv" - s3 = boto3.client( - 's3', - endpoint_url='http://minio:9000', - aws_access_key_id='minioadmin', - aws_secret_access_key='minioadmin', - ) - s3.download_file(bucket_name, object_key, temp_file) - - # Load the data - df = ps.read_csv(temp_file) - - # Perform preprocessing - df = df.drop_duplicates() - df = df.dropna() - - gender_mapping = {"Male": 0, "Female": 1} - df["gender"] = df["gender"].map(gender_mapping) - - binary_mapping = {"No": 0, "Yes": 1} - binary_col = ["Partner", "Dependents", "PhoneService", "PaperlessBilling"] - for col_name in binary_col: - df[col_name] = df[col_name].map(binary_mapping) - - categorical_col = [ - "MultipleLines", - "InternetService", - "OnlineSecurity", - "OnlineBackup", - "DeviceProtection", - "TechSupport", - "StreamingTV", - "StreamingMovies", - "Contract", - "PaymentMethod", - ] - for col_name in categorical_col: - df[col_name] = df[col_name].astype("category").cat.codes - - # Save the preprocessed data back to MinIO - preprocessed_file = "/tmp/preprocessed_data.csv" - df.to_csv(preprocessed_file, index=False) - s3.upload_file(preprocessed_file, bucket_name, output_key) - print(f"Preprocessed data uploaded to MinIO: {bucket_name}/{output_key}") - - spark.stop() - -if __name__ == "__main__": - bucket_name = sys.argv[1] - object_key = sys.argv[2] - output_key = sys.argv[3] - preprocess(bucket_name, object_key, output_key) diff --git a/airflow/functions/wordcount.py b/airflow/functions/wordcount.py deleted file mode 100644 index a092bc2deb9b4718617a7d92e8a767af69b7d5d0..0000000000000000000000000000000000000000 --- a/airflow/functions/wordcount.py +++ /dev/null @@ -1,59 +0,0 @@ -from pyspark.sql import SparkSession -import sys - -def main(): - # Check if input and output paths are provided - if len(sys.argv) != 3: - print("Usage: wordcount.py <input_path> <output_path>") - sys.exit(-1) - - input_path = sys.argv[1] - output_path = sys.argv[2] - print("=============================================================") - print("input_path: ", input_path) - print("output_path: ", output_path) - print("=============================================================") - - # Initialize SparkSession - spark = SparkSession.builder \ - .appName("WordCount") \ - .getOrCreate() - - print("=============================================================") - print("Spark Session Created") - print("=============================================================") - - - # Read input file - text_file = spark.read.text(input_path).rdd - - print("=============================================================") - print("Text File Read") - print("=============================================================") - - - # Count words - word_counts = ( - text_file.flatMap(lambda line: line.value.split()) # Split lines into words - .map(lambda word: (word, 1)) # Create (word, 1) tuples - .reduceByKey(lambda a, b: a + b) # Reduce by key to sum word counts - ) - - print("=============================================================") - print("Word count: ", word_counts) - print("=============================================================") - - - # Collect results and save to the output file - # word_counts.saveAsTextFile(output_path) - - print("=============================================================") - print("Word counts saved") - print("=============================================================") - - - # Stop the SparkSession - spark.stop() - -if __name__ == "__main__": - main()