diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000000000000000000000000000000000000..6cd7f5a810a9d3cfea7567cc0f28b960589f0149 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,5 @@ +venv +__pycache__ +*.pyo +.git +*.pyc \ No newline at end of file diff --git a/.gitignore b/.gitignore index 36665759009171030e900d0bbd78fa9eb0576089..7ce59779b78a8c442b57304628ec9dd048a4721b 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -model/mlruns \ No newline at end of file +model/mlruns +dags/_pycache_ \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..9b78b2221dca5db6843fa2bf6d6440aa8b7358a9 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,13 @@ +FROM apache/airflow:2.7.1-python3.11 + +USER root +RUN apt-get update && \ + apt-get install -y gcc python3-dev openjdk-11-jdk && \ + apt-get clean + +# Set JAVA_HOME environment variable +ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64 + +USER airflow + +RUN pip install apache-airflow-providers-apache-spark==2.1.1 pyspark==3.5.0 apache-airflow-providers-openlineage>=1.8.0 diff --git a/airflow.env b/airflow.env new file mode 100644 index 0000000000000000000000000000000000000000..a987697c2c5d5ca5aa64fffe31ef960d900d9177 --- /dev/null +++ b/airflow.env @@ -0,0 +1,5 @@ +AIRFLOW__CORE__LOAD_EXAMPLES=False +AIRFLOW__CORE__EXECUTOR=LocalExecutor +AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow +AIRFLOW__WEBSERVER_BASE_URL=http://localhost:8080 +AIRFLOW__WEBSERVER__SECRET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho= \ No newline at end of file diff --git a/dags/__pycache__/spark_dag.cpython-311.pyc b/dags/__pycache__/spark_dag.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..e7c5931b854a77b4519137befe6a52708a2325ee Binary files /dev/null and b/dags/__pycache__/spark_dag.cpython-311.pyc differ diff --git a/dags/spark_dag.py b/dags/spark_dag.py new file mode 100644 index 0000000000000000000000000000000000000000..06cd607ddd5fec1ee8c5060b38d0120ea7fbc02a --- /dev/null +++ b/dags/spark_dag.py @@ -0,0 +1,33 @@ +import airflow +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator + +dag = DAG( + dag_id = "spark_dag", + default_args = { + "start_date": airflow.utils.dates.days_ago(1) + }, + schedule_interval = "@daily" +) + +start = PythonOperator( + task_id="start", + python_callable = lambda: print("Jobs started"), + dag=dag +) + +test = SparkSubmitOperator( + task_id="test", + conn_id="spark-conn", + application="jobs/spark/test.py", + dag=dag +) + +end = PythonOperator( + task_id="end", + python_callable = lambda: print("Jobs completed successfully"), + dag=dag +) + +start >> test >> end \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000000000000000000000000000000000000..2e1aa7e4f84b31f46985a29aebc492b427a6dded --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,66 @@ +version: '3' + +x-spark-common: &spark-common + image: bitnami/spark:latest + volumes: + - ./jobs:/opt/bitnami/spark/jobs + networks: + - airflow-xops + +x-airflow-common: &airflow-common + build: + context: . + dockerfile: Dockerfile + env_file: + - airflow.env + volumes: + - ./jobs:/opt/airflow/jobs + - ./dags:/opt/airflow/dags + - ./logs:/opt/airflow/logs + depends_on: + - postgres + networks: + - airflow-xops + +services: + spark-master: + <<: *spark-common + command: bin/spark-class org.apache.spark.deploy.master.Master + ports: + - "9090:8080" + - "7077:7077" + + spark-worker: + <<: *spark-common + command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077 + depends_on: + - spark-master + environment: + SPARK_MODE: worker + SPARK_WORKER_CORES: 2 + SPARK_WORKER_MEMORY: 1g + SPARK_MASTER_URL: spark://spark-master:7077 + + postgres: + image: postgres:14.0 + environment: + - POSTGRES_USER=airflow + - POSTGRES_PASSWORD=airflow + - POSTGRES_DB=airflow + networks: + - airflow-xops + + webserver: + <<: *airflow-common + command: webserver + ports: + - "8080:8080" + depends_on: + - scheduler + + scheduler: + <<: *airflow-common + command: bash -c "airflow db migrate && airflow users create --username admin --firstname admin --lastname admin --role Admin --email admin@gmail.com --password admin && airflow scheduler" + +networks: + airflow-xops: \ No newline at end of file diff --git a/jobs/data/data.csv b/jobs/data/data.csv new file mode 100644 index 0000000000000000000000000000000000000000..c0e7442405761163e01dd54261e4767fccec7dac --- /dev/null +++ b/jobs/data/data.csv @@ -0,0 +1,11 @@ +date,product,category,quantity,unit_price,total_sales,customer_id,region +2024-01-01,Laptop,Electronics,2,999.99,1999.98,C001,North +2024-01-01,Mouse,Electronics,5,29.99,149.95,C002,South +2024-01-02,Keyboard,Electronics,3,89.99,269.97,C003,East +2024-01-02,Monitor,Electronics,1,299.99,299.99,C001,North +2024-01-03,Headphones,Electronics,4,79.99,319.96,C004,West +2024-01-03,SSD Drive,Electronics,2,159.99,319.98,C005,South +2024-01-04,Webcam,Electronics,3,69.99,209.97,C002,South +2024-01-04,USB Cable,Accessories,10,9.99,99.90,C003,East +2024-01-05,Power Bank,Accessories,5,49.99,249.95,C004,West +2024-01-05,Phone Case,Accessories,8,19.99,159.92,C005,South \ No newline at end of file diff --git a/jobs/spark/test.py b/jobs/spark/test.py new file mode 100644 index 0000000000000000000000000000000000000000..82679e2d6d548492c1dd0d982b9eade61703bf1d --- /dev/null +++ b/jobs/spark/test.py @@ -0,0 +1,7 @@ +from pyspark.sql import SparkSession + +def main(): + print("Hellloooooooooooooooooooooooooooooooooooooooooooooooooooo\ndesu") + +if __name__ == "__main__": + main() \ No newline at end of file