From 98a56790c2cae9561c68c103bbb94639697f27e4 Mon Sep 17 00:00:00 2001 From: Sulthan <13521159@std.stei.itb.ac.id> Date: Thu, 9 Jan 2025 08:23:00 +0700 Subject: [PATCH] add: aiflow --- .dockerignore | 5 ++ .gitignore | 3 +- Dockerfile | 13 ++++ airflow.env | 5 ++ dags/__pycache__/spark_dag.cpython-311.pyc | Bin 0 -> 1309 bytes dags/spark_dag.py | 33 +++++++++++ docker-compose.yml | 66 +++++++++++++++++++++ jobs/data/data.csv | 11 ++++ jobs/spark/test.py | 7 +++ 9 files changed, 142 insertions(+), 1 deletion(-) create mode 100644 .dockerignore create mode 100644 Dockerfile create mode 100644 airflow.env create mode 100644 dags/__pycache__/spark_dag.cpython-311.pyc create mode 100644 dags/spark_dag.py create mode 100644 docker-compose.yml create mode 100644 jobs/data/data.csv create mode 100644 jobs/spark/test.py diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..6cd7f5a --- /dev/null +++ b/.dockerignore @@ -0,0 +1,5 @@ +venv +__pycache__ +*.pyo +.git +*.pyc \ No newline at end of file diff --git a/.gitignore b/.gitignore index 3666575..7ce5977 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -model/mlruns \ No newline at end of file +model/mlruns +dags/_pycache_ \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..9b78b22 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,13 @@ +FROM apache/airflow:2.7.1-python3.11 + +USER root +RUN apt-get update && \ + apt-get install -y gcc python3-dev openjdk-11-jdk && \ + apt-get clean + +# Set JAVA_HOME environment variable +ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64 + +USER airflow + +RUN pip install apache-airflow-providers-apache-spark==2.1.1 pyspark==3.5.0 apache-airflow-providers-openlineage>=1.8.0 diff --git a/airflow.env b/airflow.env new file mode 100644 index 0000000..a987697 --- /dev/null +++ b/airflow.env @@ -0,0 +1,5 @@ +AIRFLOW__CORE__LOAD_EXAMPLES=False +AIRFLOW__CORE__EXECUTOR=LocalExecutor +AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow +AIRFLOW__WEBSERVER_BASE_URL=http://localhost:8080 +AIRFLOW__WEBSERVER__SECRET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho= \ No newline at end of file diff --git a/dags/__pycache__/spark_dag.cpython-311.pyc b/dags/__pycache__/spark_dag.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..e7c5931b854a77b4519137befe6a52708a2325ee GIT binary patch literal 1309 zcmbtTy^9-16o0GL?rNo#?nBDn#U|ox+^EQjSX>uOd;|h1E`cMB8ZoDtllMH*E;B0! zsR(Ou;nIYFtAMKuCNA=)=)#38TNzTh2q@f<!d2c|NyZB4GMf3#+xIo|d-L>DwOR%o zzkUDR;LifUAM~*r&QiGg8-?e<fg2LKv=<}{jjmykY|6sW?3%e;l*OUdwG1eLFu28S z0cZ9q-PtN1yeO@7@9*3>HvoRdwHSfHORwB>6Lq*2qw8X3lb3nr2?+O<_b+Q1troBH z8s9+c=6MlgaV=vjdF&S7{?AxXltpFg0tk-~`54F0s=uM&qn44GpWpuix7yc}_(3@O zDiVstp+ZglIAZGI@%UsI#4B~?YEt$%8)Ri2GZmv4i#J4WYJbLAASW#|v(erQc;<0& z%EmJ8F*VTHmhL|gd@MyT7{x+8VluPx9Q_561yW6DUF{(}hxlFH0B^otN#s#DntES` zCwebWB=}1svm%B0U&0l}!`o&%jN)Ts==ZR5Rk`yGY-X!5r7sbs?w&hDu?x^Lpr zHG~9y6fcf0nw`sLCjqsED&*zFOg|)I8&O`ZUY|+HPNc|8sz@;wI-ZvE{2%wj(P+AT zhId;W*d`;M*ea?9X3HFMpj;IeMKb8KI0#3|CzB~g{QuSDW?qwiIE<u-1>e)-eqU&P zI+k*xh@T>+IW4tK5|Pj0BNX%o0##sk=l(_W@UnTBEK5+$oMkB$y-~Ki{MrwfeWQ^f z^^v0X%nnxONQI9AE|lJ95yLLn&ujEv)Z$Ha-fYUESZf>yQfC(Rm(EI@O>~b9!n{Jt zLl3n>aD!l-48G|wq&~Z+ZljeNNaOz)=B8m7Db$kum%`3$eWuWvt<My`nXS*)wwv_R z>h9}O<JsVc;icdD-M{;*fA^P9F8qT_|6pzw8ur|V4{yyYwo~{E5UvRlbG`}g&eK}z z)l<JS^E+v&GIxqD`XF2rB-Y%6(#@H3BlWgZZ#Uh%Lr$($LMMc4f}}XFf%8E!O1-Vr iKb-l8i$ru-kq$$^!#ptTr;W$eXY~tc%vLjh@qYldtxE3z literal 0 HcmV?d00001 diff --git a/dags/spark_dag.py b/dags/spark_dag.py new file mode 100644 index 0000000..06cd607 --- /dev/null +++ b/dags/spark_dag.py @@ -0,0 +1,33 @@ +import airflow +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator + +dag = DAG( + dag_id = "spark_dag", + default_args = { + "start_date": airflow.utils.dates.days_ago(1) + }, + schedule_interval = "@daily" +) + +start = PythonOperator( + task_id="start", + python_callable = lambda: print("Jobs started"), + dag=dag +) + +test = SparkSubmitOperator( + task_id="test", + conn_id="spark-conn", + application="jobs/spark/test.py", + dag=dag +) + +end = PythonOperator( + task_id="end", + python_callable = lambda: print("Jobs completed successfully"), + dag=dag +) + +start >> test >> end \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..2e1aa7e --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,66 @@ +version: '3' + +x-spark-common: &spark-common + image: bitnami/spark:latest + volumes: + - ./jobs:/opt/bitnami/spark/jobs + networks: + - airflow-xops + +x-airflow-common: &airflow-common + build: + context: . + dockerfile: Dockerfile + env_file: + - airflow.env + volumes: + - ./jobs:/opt/airflow/jobs + - ./dags:/opt/airflow/dags + - ./logs:/opt/airflow/logs + depends_on: + - postgres + networks: + - airflow-xops + +services: + spark-master: + <<: *spark-common + command: bin/spark-class org.apache.spark.deploy.master.Master + ports: + - "9090:8080" + - "7077:7077" + + spark-worker: + <<: *spark-common + command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077 + depends_on: + - spark-master + environment: + SPARK_MODE: worker + SPARK_WORKER_CORES: 2 + SPARK_WORKER_MEMORY: 1g + SPARK_MASTER_URL: spark://spark-master:7077 + + postgres: + image: postgres:14.0 + environment: + - POSTGRES_USER=airflow + - POSTGRES_PASSWORD=airflow + - POSTGRES_DB=airflow + networks: + - airflow-xops + + webserver: + <<: *airflow-common + command: webserver + ports: + - "8080:8080" + depends_on: + - scheduler + + scheduler: + <<: *airflow-common + command: bash -c "airflow db migrate && airflow users create --username admin --firstname admin --lastname admin --role Admin --email admin@gmail.com --password admin && airflow scheduler" + +networks: + airflow-xops: \ No newline at end of file diff --git a/jobs/data/data.csv b/jobs/data/data.csv new file mode 100644 index 0000000..c0e7442 --- /dev/null +++ b/jobs/data/data.csv @@ -0,0 +1,11 @@ +date,product,category,quantity,unit_price,total_sales,customer_id,region +2024-01-01,Laptop,Electronics,2,999.99,1999.98,C001,North +2024-01-01,Mouse,Electronics,5,29.99,149.95,C002,South +2024-01-02,Keyboard,Electronics,3,89.99,269.97,C003,East +2024-01-02,Monitor,Electronics,1,299.99,299.99,C001,North +2024-01-03,Headphones,Electronics,4,79.99,319.96,C004,West +2024-01-03,SSD Drive,Electronics,2,159.99,319.98,C005,South +2024-01-04,Webcam,Electronics,3,69.99,209.97,C002,South +2024-01-04,USB Cable,Accessories,10,9.99,99.90,C003,East +2024-01-05,Power Bank,Accessories,5,49.99,249.95,C004,West +2024-01-05,Phone Case,Accessories,8,19.99,159.92,C005,South \ No newline at end of file diff --git a/jobs/spark/test.py b/jobs/spark/test.py new file mode 100644 index 0000000..82679e2 --- /dev/null +++ b/jobs/spark/test.py @@ -0,0 +1,7 @@ +from pyspark.sql import SparkSession + +def main(): + print("Hellloooooooooooooooooooooooooooooooooooooooooooooooooooo\ndesu") + +if __name__ == "__main__": + main() \ No newline at end of file -- GitLab