diff --git a/Dockerfile b/Dockerfile index eea8cbbede893f388c8f8b0496e9d23c4a0bf7ed..43a318b86a2ceef2212244c37046f49d37744cd7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,13 +3,13 @@ FROM apache/airflow:2.7.1-python3.11 USER root RUN apt-get update && \ - apt-get install -y gcc python3-dev openjdk-11-jdk && \ + apt-get install -y gcc python3-dev openjdk-17-jdk && \ apt-get clean -ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-arm64 +ENV JAVA_HOME /usr/lib/jvm/java-17-openjdk-arm64 USER airflow COPY requirements.txt /app/requirements.txt -RUN pip install --no-cache-dir -r /app/requirements.txt \ No newline at end of file +RUN pip install --no-cache-dir -r /app/requirements.txt diff --git a/dags/spark_airflow.py b/dags/spark_airflow.py new file mode 100644 index 0000000000000000000000000000000000000000..292cffc89a99fdf5ca89398b7297b0476057d132 --- /dev/null +++ b/dags/spark_airflow.py @@ -0,0 +1,36 @@ +import airflow +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator + +dag = DAG( + dag_id = "sparking_flow", + default_args = { + "owner": "admin", + "start_date": airflow.utils.dates.days_ago(1) + }, + schedule_interval = "@daily" +) + +start = PythonOperator( + task_id="start", + python_callable = lambda: print("Jobs started"), + dag=dag +) + +python_job = SparkSubmitOperator( + task_id="python_job", + conn_id="spark_conn", + application="jobs/python/wordcount.py", + dag=dag, +) + + + +end = PythonOperator( + task_id="end", + python_callable = lambda: print("Jobs completed successfully"), + dag=dag +) + +start >> python_job >> end \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index af4f61c26161b77796b8c484931b1f30bc80d291..a622131e928023dc3b3fc45ccbca8f909363fa00 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,7 @@ version: '3' x-spark-common: &spark-common - image: bitnami/spark:latest + image: bitnami/spark:3.5.1 volumes: - ./jobs:/opt/bitnami/spark/jobs networks: @@ -26,6 +26,8 @@ services: spark-master: <<: *spark-common command: bin/spark-class org.apache.spark.deploy.master.Master + environment: + - SPARK_MASTER_HOST=spark-master ports: - "9090:8080" - "7077:7077" @@ -40,6 +42,7 @@ services: SPARK_WORKER_CORES: 2 SPARK_WORKER_MEMORY: 1g SPARK_MASTER_URL: spark://spark-master:7077 + SPARK_MASTER: spark://spark-master:7077 postgres: image: postgres:14.0 diff --git a/jobs/python/wordcount.py b/jobs/python/wordcount.py new file mode 100644 index 0000000000000000000000000000000000000000..2e3866b0add0f3b35e97ed798186eab015d5ccfc --- /dev/null +++ b/jobs/python/wordcount.py @@ -0,0 +1,14 @@ +from pyspark.sql import SparkSession + +spark = SparkSession.builder.appName("PythonWordCount").getOrCreate() + +text = "Hello Spark Hello Python Hello Airflow Hello Docker and Hello Yusuf" + +words = spark.sparkContext.parallelize(text.split(" ")) + +wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b) + +for wc in wordCounts.collect(): + print(wc[0], wc[1]) + +spark.stop() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 5d8e452e9ba0ee25c3df85b48a2360a8ca6427e9..a8a180ab10d8535f21746004a7feefb5c45a81d9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -pyspark +pyspark==3.5.1 apache-airflow -apache-airflow-providers-apache-spark -apache-airflow-providers-openlineage>=1.8.0 \ No newline at end of file +apache-airflow-providers-apache-spark +apache-airflow-providers-openlineage>=1.8.0