From de53aa79b2d3a227309e0a1d8347442be0099d6e Mon Sep 17 00:00:00 2001 From: afnanramadhan <13521011@std.stei.itb.ac.id> Date: Thu, 9 Jan 2025 00:00:22 +0700 Subject: [PATCH] fix: edit version spark and java --- Dockerfile | 6 +++--- dags/spark_airflow.py | 36 ++++++++++++++++++++++++++++++++++++ docker-compose.yml | 5 ++++- jobs/python/wordcount.py | 14 ++++++++++++++ requirements.txt | 6 +++--- 5 files changed, 60 insertions(+), 7 deletions(-) create mode 100644 dags/spark_airflow.py create mode 100644 jobs/python/wordcount.py diff --git a/Dockerfile b/Dockerfile index eea8cbb..43a318b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,13 +3,13 @@ FROM apache/airflow:2.7.1-python3.11 USER root RUN apt-get update && \ - apt-get install -y gcc python3-dev openjdk-11-jdk && \ + apt-get install -y gcc python3-dev openjdk-17-jdk && \ apt-get clean -ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-arm64 +ENV JAVA_HOME /usr/lib/jvm/java-17-openjdk-arm64 USER airflow COPY requirements.txt /app/requirements.txt -RUN pip install --no-cache-dir -r /app/requirements.txt \ No newline at end of file +RUN pip install --no-cache-dir -r /app/requirements.txt diff --git a/dags/spark_airflow.py b/dags/spark_airflow.py new file mode 100644 index 0000000..292cffc --- /dev/null +++ b/dags/spark_airflow.py @@ -0,0 +1,36 @@ +import airflow +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator + +dag = DAG( + dag_id = "sparking_flow", + default_args = { + "owner": "admin", + "start_date": airflow.utils.dates.days_ago(1) + }, + schedule_interval = "@daily" +) + +start = PythonOperator( + task_id="start", + python_callable = lambda: print("Jobs started"), + dag=dag +) + +python_job = SparkSubmitOperator( + task_id="python_job", + conn_id="spark_conn", + application="jobs/python/wordcount.py", + dag=dag, +) + + + +end = PythonOperator( + task_id="end", + python_callable = lambda: print("Jobs completed successfully"), + dag=dag +) + +start >> python_job >> end \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index af4f61c..a622131 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,7 @@ version: '3' x-spark-common: &spark-common - image: bitnami/spark:latest + image: bitnami/spark:3.5.1 volumes: - ./jobs:/opt/bitnami/spark/jobs networks: @@ -26,6 +26,8 @@ services: spark-master: <<: *spark-common command: bin/spark-class org.apache.spark.deploy.master.Master + environment: + - SPARK_MASTER_HOST=spark-master ports: - "9090:8080" - "7077:7077" @@ -40,6 +42,7 @@ services: SPARK_WORKER_CORES: 2 SPARK_WORKER_MEMORY: 1g SPARK_MASTER_URL: spark://spark-master:7077 + SPARK_MASTER: spark://spark-master:7077 postgres: image: postgres:14.0 diff --git a/jobs/python/wordcount.py b/jobs/python/wordcount.py new file mode 100644 index 0000000..2e3866b --- /dev/null +++ b/jobs/python/wordcount.py @@ -0,0 +1,14 @@ +from pyspark.sql import SparkSession + +spark = SparkSession.builder.appName("PythonWordCount").getOrCreate() + +text = "Hello Spark Hello Python Hello Airflow Hello Docker and Hello Yusuf" + +words = spark.sparkContext.parallelize(text.split(" ")) + +wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b) + +for wc in wordCounts.collect(): + print(wc[0], wc[1]) + +spark.stop() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 5d8e452..a8a180a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -pyspark +pyspark==3.5.1 apache-airflow -apache-airflow-providers-apache-spark -apache-airflow-providers-openlineage>=1.8.0 \ No newline at end of file +apache-airflow-providers-apache-spark +apache-airflow-providers-openlineage>=1.8.0 -- GitLab