diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..19cc988c12858d5f2a0f090379dff173f742fe12 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,8 @@ +FROM apache/airflow:latest +USER root + +RUN apt-get update && \ + apt-get -y install git && \ + apt-get clean + +USER airflow \ No newline at end of file diff --git a/airflow/airflow-webserver.pid b/airflow/airflow-webserver.pid new file mode 100644 index 0000000000000000000000000000000000000000..f5c89552bd3e62bfce023a230e90d141f7a46b2f --- /dev/null +++ b/airflow/airflow-webserver.pid @@ -0,0 +1 @@ +32 diff --git a/airflow.cfg b/airflow/airflow.cfg similarity index 78% rename from airflow.cfg rename to airflow/airflow.cfg index 6efbf377b02e1e89ebd632bd85932b4218875669..2771560ff9771f39f79eb5e0101d25916385d356 100644 --- a/airflow.cfg +++ b/airflow/airflow.cfg @@ -4,7 +4,7 @@ # # Variable: AIRFLOW__CORE__DAGS_FOLDER # -dags_folder = /Users/apple/Desktop/itb/semester 7/if4054-xops/dags +dags_folder = /opt/airflow/dags # Hostname by providing a path to a callable, which will resolve the hostname. # The format is "package.function". @@ -127,7 +127,7 @@ load_examples = True # # Variable: AIRFLOW__CORE__PLUGINS_FOLDER # -plugins_folder = /Users/apple/Desktop/itb/semester 7/if4054-xops/plugins +plugins_folder = /opt/airflow/plugins # Should tasks be executed via forking of the parent process # @@ -453,7 +453,7 @@ database_access_isolation = False # # Variable: AIRFLOW__CORE__INTERNAL_API_SECRET_KEY # -internal_api_secret_key = UqNjFyZBYpdJUMvELCKXcw== +internal_api_secret_key = qbkITk4FtDBCkdn5dS8mNQ== # The ability to allow testing connections across Airflow UI, API and CLI. # Supported options: ``Disabled``, ``Enabled``, ``Hidden``. Default: Disabled @@ -494,7 +494,7 @@ alembic_ini_file_path = alembic.ini # # Variable: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN # -sql_alchemy_conn = sqlite:////Users/apple/Desktop/itb/semester 7/if4054-xops/airflow.db +sql_alchemy_conn = sqlite:////opt/airflow/airflow.db # Extra engine specific keyword args passed to SQLAlchemy's create_engine, as a JSON-encoded value # @@ -635,7 +635,7 @@ check_migrations = True # # Variable: AIRFLOW__LOGGING__BASE_LOG_FOLDER # -base_log_folder = /Users/apple/Desktop/itb/semester 7/if4054-xops/logs +base_log_folder = /opt/airflow/logs # Airflow can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search. # Set this to ``True`` if you want to enable remote logging. @@ -817,7 +817,7 @@ log_processor_filename_template = {{ filename }}.log # # Variable: AIRFLOW__LOGGING__DAG_PROCESSOR_MANAGER_LOG_LOCATION # -dag_processor_manager_log_location = /Users/apple/Desktop/itb/semester 7/if4054-xops/logs/dag_processor_manager/dag_processor_manager.log +dag_processor_manager_log_location = /opt/airflow/logs/dag_processor_manager/dag_processor_manager.log # Whether DAG processor manager will write logs to stdout # @@ -1387,7 +1387,7 @@ access_denied_message = Access is Denied # # Variable: AIRFLOW__WEBSERVER__CONFIG_FILE # -config_file = /Users/apple/Desktop/itb/semester 7/if4054-xops/webserver_config.py +config_file = /opt/airflow/webserver_config.py # The base url of your website: Airflow cannot guess what domain or CNAME you are using. # This is used to create links in the Log Url column in the Browse - Task Instances menu, @@ -1497,7 +1497,7 @@ reload_on_plugin_change = False # # Variable: AIRFLOW__WEBSERVER__SECRET_KEY # -secret_key = UqNjFyZBYpdJUMvELCKXcw== +secret_key = qbkITk4FtDBCkdn5dS8mNQ== # Number of workers to run the Gunicorn web server # @@ -2162,7 +2162,7 @@ orphaned_tasks_check_interval = 300.0 # # Variable: AIRFLOW__SCHEDULER__CHILD_PROCESS_LOG_DIRECTORY # -child_process_log_directory = /Users/apple/Desktop/itb/semester 7/if4054-xops/logs/scheduler +child_process_log_directory = /opt/airflow/logs/scheduler # Local task jobs periodically heartbeat to the DB. If the job has # not heartbeat in this many seconds, the scheduler will mark the @@ -2437,6 +2437,489 @@ default_timeout = 604800 # enabled = True +[aws] +# This section contains settings for Amazon Web Services (AWS) integration. + +# session_factory = +cloudwatch_task_handler_json_serializer = airflow.providers.amazon.aws.log.cloudwatch_task_handler.json_serialize_legacy + +[aws_batch_executor] +# This section only applies if you are using the AwsBatchExecutor in +# Airflow's ``[core]`` configuration. +# For more information on any of these execution parameters, see the link below: +# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/batch.html#Batch.Client.submit_job +# For boto3 credential management, see +# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html + +conn_id = aws_default +# region_name = +max_submit_job_attempts = 3 +check_health_on_startup = True +# job_name = +# job_queue = +# job_definition = +# submit_job_kwargs = + +[aws_ecs_executor] +# This section only applies if you are using the AwsEcsExecutor in +# Airflow's ``[core]`` configuration. +# For more information on any of these execution parameters, see the link below: +# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs/client/run_task.html +# For boto3 credential management, see +# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html + +conn_id = aws_default +# region_name = +assign_public_ip = False +# cluster = +# capacity_provider_strategy = +# container_name = +# launch_type = +platform_version = LATEST +# security_groups = +# subnets = +# task_definition = +max_run_task_attempts = 3 +# run_task_kwargs = +check_health_on_startup = True + +[aws_auth_manager] +# This section only applies if you are using the AwsAuthManager. In other words, if you set +# ``[core] auth_manager = airflow.providers.amazon.aws.auth_manager.aws_auth_manager.AwsAuthManager`` in +# Airflow's configuration. + +enable = False +conn_id = aws_default +# region_name = +# saml_metadata_url = +# avp_policy_store_id = + +[celery_kubernetes_executor] +# This section only applies if you are using the ``CeleryKubernetesExecutor`` in +# ``[core]`` section above + +# Define when to send a task to ``KubernetesExecutor`` when using ``CeleryKubernetesExecutor``. +# When the queue of a task is the value of ``kubernetes_queue`` (default ``kubernetes``), +# the task is executed via ``KubernetesExecutor``, +# otherwise via ``CeleryExecutor`` +# +# Variable: AIRFLOW__CELERY_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE +# +kubernetes_queue = kubernetes + +[celery] +# This section only applies if you are using the CeleryExecutor in +# ``[core]`` section above + +# The app name that will be used by celery +# +# Variable: AIRFLOW__CELERY__CELERY_APP_NAME +# +celery_app_name = airflow.providers.celery.executors.celery_executor + +# The concurrency that will be used when starting workers with the +# ``airflow celery worker`` command. This defines the number of task instances that +# a worker will take, so size up your workers based on the resources on +# your worker box and the nature of your tasks +# +# Variable: AIRFLOW__CELERY__WORKER_CONCURRENCY +# +worker_concurrency = 16 + +# The maximum and minimum number of pool processes that will be used to dynamically resize +# the pool based on load.Enable autoscaling by providing max_concurrency,min_concurrency +# with the ``airflow celery worker`` command (always keep minimum processes, +# but grow to maximum if necessary). +# Pick these numbers based on resources on worker box and the nature of the task. +# If autoscale option is available, worker_concurrency will be ignored. +# https://docs.celeryq.dev/en/latest/reference/celery.bin.worker.html#cmdoption-celery-worker-autoscale +# +# Example: worker_autoscale = 16,12 +# +# Variable: AIRFLOW__CELERY__WORKER_AUTOSCALE +# +# worker_autoscale = + +# Used to increase the number of tasks that a worker prefetches which can improve performance. +# The number of processes multiplied by worker_prefetch_multiplier is the number of tasks +# that are prefetched by a worker. A value greater than 1 can result in tasks being unnecessarily +# blocked if there are multiple workers and one worker prefetches tasks that sit behind long +# running tasks while another worker has unutilized processes that are unable to process the already +# claimed blocked tasks. +# https://docs.celeryq.dev/en/stable/userguide/optimizing.html#prefetch-limits +# +# Variable: AIRFLOW__CELERY__WORKER_PREFETCH_MULTIPLIER +# +worker_prefetch_multiplier = 1 + +# Specify if remote control of the workers is enabled. +# In some cases when the broker does not support remote control, Celery creates lots of +# ``.*reply-celery-pidbox`` queues. You can prevent this by setting this to false. +# However, with this disabled Flower won't work. +# https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/index.html#broker-overview +# +# Variable: AIRFLOW__CELERY__WORKER_ENABLE_REMOTE_CONTROL +# +worker_enable_remote_control = true + +# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally +# a sqlalchemy database. Refer to the Celery documentation for more information. +# +# Variable: AIRFLOW__CELERY__BROKER_URL +# +broker_url = redis://redis:6379/0 + +# The Celery result_backend. When a job finishes, it needs to update the +# metadata of the job. Therefore it will post a message on a message bus, +# or insert it into a database (depending of the backend) +# This status is used by the scheduler to update the state of the task +# The use of a database is highly recommended +# When not specified, sql_alchemy_conn with a db+ scheme prefix will be used +# https://docs.celeryq.dev/en/latest/userguide/configuration.html#task-result-backend-settings +# +# Example: result_backend = db+postgresql://postgres:airflow@postgres/airflow +# +# Variable: AIRFLOW__CELERY__RESULT_BACKEND +# +# result_backend = + +# Optional configuration dictionary to pass to the Celery result backend SQLAlchemy engine. +# +# Example: result_backend_sqlalchemy_engine_options = {"pool_recycle": 1800} +# +# Variable: AIRFLOW__CELERY__RESULT_BACKEND_SQLALCHEMY_ENGINE_OPTIONS +# +result_backend_sqlalchemy_engine_options = + +# Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start +# it ``airflow celery flower``. This defines the IP that Celery Flower runs on +# +# Variable: AIRFLOW__CELERY__FLOWER_HOST +# +flower_host = 0.0.0.0 + +# The root URL for Flower +# +# Example: flower_url_prefix = /flower +# +# Variable: AIRFLOW__CELERY__FLOWER_URL_PREFIX +# +flower_url_prefix = + +# This defines the port that Celery Flower runs on +# +# Variable: AIRFLOW__CELERY__FLOWER_PORT +# +flower_port = 5555 + +# Securing Flower with Basic Authentication +# Accepts user:password pairs separated by a comma +# +# Example: flower_basic_auth = user1:password1,user2:password2 +# +# Variable: AIRFLOW__CELERY__FLOWER_BASIC_AUTH +# +flower_basic_auth = + +# How many processes CeleryExecutor uses to sync task state. +# 0 means to use max(1, number of cores - 1) processes. +# +# Variable: AIRFLOW__CELERY__SYNC_PARALLELISM +# +sync_parallelism = 0 + +# Import path for celery configuration options +# +# Variable: AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS +# +celery_config_options = airflow.providers.celery.executors.default_celery.DEFAULT_CELERY_CONFIG + +# +# Variable: AIRFLOW__CELERY__SSL_ACTIVE +# +ssl_active = False + +# Path to the client key. +# +# Variable: AIRFLOW__CELERY__SSL_KEY +# +ssl_key = + +# Path to the client certificate. +# +# Variable: AIRFLOW__CELERY__SSL_CERT +# +ssl_cert = + +# Path to the CA certificate. +# +# Variable: AIRFLOW__CELERY__SSL_CACERT +# +ssl_cacert = + +# Celery Pool implementation. +# Choices include: ``prefork`` (default), ``eventlet``, ``gevent`` or ``solo``. +# See: +# https://docs.celeryq.dev/en/latest/userguide/workers.html#concurrency +# https://docs.celeryq.dev/en/latest/userguide/concurrency/eventlet.html +# +# Variable: AIRFLOW__CELERY__POOL +# +pool = prefork + +# The number of seconds to wait before timing out ``send_task_to_executor`` or +# ``fetch_celery_task_state`` operations. +# +# Variable: AIRFLOW__CELERY__OPERATION_TIMEOUT +# +operation_timeout = 1.0 + +task_acks_late = True +# Celery task will report its status as 'started' when the task is executed by a worker. +# This is used in Airflow to keep track of the running tasks and if a Scheduler is restarted +# or run in HA mode, it can adopt the orphan tasks launched by previous SchedulerJob. +# +# Variable: AIRFLOW__CELERY__TASK_TRACK_STARTED +# +task_track_started = True + +# The Maximum number of retries for publishing task messages to the broker when failing +# due to ``AirflowTaskTimeout`` error before giving up and marking Task as failed. +# +# Variable: AIRFLOW__CELERY__TASK_PUBLISH_MAX_RETRIES +# +task_publish_max_retries = 3 + +# Worker initialisation check to validate Metadata Database connection +# +# Variable: AIRFLOW__CELERY__WORKER_PRECHECK +# +worker_precheck = False + +[celery_broker_transport_options] +# This section is for specifying options which can be passed to the +# underlying celery broker transport. See: +# https://docs.celeryq.dev/en/latest/userguide/configuration.html#std:setting-broker_transport_options + +# The visibility timeout defines the number of seconds to wait for the worker +# to acknowledge the task before the message is redelivered to another worker. +# Make sure to increase the visibility timeout to match the time of the longest +# ETA you're planning to use. +# visibility_timeout is only supported for Redis and SQS celery brokers. +# See: +# https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#visibility-timeout +# +# Example: visibility_timeout = 21600 +# +# Variable: AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT +# +# visibility_timeout = + +# The sentinel_kwargs parameter allows passing additional options to the Sentinel client. +# In a typical scenario where Redis Sentinel is used as the broker and Redis servers are +# password-protected, the password needs to be passed through this parameter. Although its +# type is string, it is required to pass a string that conforms to the dictionary format. +# See: +# https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#configuration +# +# Example: sentinel_kwargs = {"password": "password_for_redis_server"} +# +# Variable: AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__SENTINEL_KWARGS +# +# sentinel_kwargs = + +[local_kubernetes_executor] +# This section only applies if you are using the ``LocalKubernetesExecutor`` in +# ``[core]`` section above + +# Define when to send a task to ``KubernetesExecutor`` when using ``LocalKubernetesExecutor``. +# When the queue of a task is the value of ``kubernetes_queue`` (default ``kubernetes``), +# the task is executed via ``KubernetesExecutor``, +# otherwise via ``LocalExecutor`` +# +# Variable: AIRFLOW__LOCAL_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE +# +kubernetes_queue = kubernetes + +[kubernetes_executor] +# Kwargs to override the default urllib3 Retry used in the kubernetes API client +# +# Example: api_client_retry_configuration = { "total": 3, "backoff_factor": 0.5 } +# +# Variable: AIRFLOW__KUBERNETES_EXECUTOR__API_CLIENT_RETRY_CONFIGURATION +# +api_client_retry_configuration = + +# Flag to control the information added to kubernetes executor logs for better traceability +# +# Variable: AIRFLOW__KUBERNETES_EXECUTOR__LOGS_TASK_METADATA +# +logs_task_metadata = False + +# Path to the YAML pod file that forms the basis for KubernetesExecutor workers. +# +# Variable: AIRFLOW__KUBERNETES_EXECUTOR__POD_TEMPLATE_FILE +# +pod_template_file = + +# The repository of the Kubernetes Image for the Worker to Run +# +# Variable: AIRFLOW__KUBERNETES_EXECUTOR__WORKER_CONTAINER_REPOSITORY +# +worker_container_repository = + +# The tag of the Kubernetes Image for the Worker to Run +# +# Variable: AIRFLOW__KUBERNETES_EXECUTOR__WORKER_CONTAINER_TAG +# +worker_container_tag = + +# The Kubernetes namespace where airflow workers should be created. Defaults to ``default`` +# +# Variable: AIRFLOW__KUBERNETES_EXECUTOR__NAMESPACE +# +namespace = default + +# If True, all worker pods will be deleted upon termination +# +# Variable: AIRFLOW__KUBERNETES_EXECUTOR__DELETE_WORKER_PODS +# +delete_worker_pods = True + +# If False (and delete_worker_pods is True), +# failed worker pods will not be deleted so users can investigate them. +# This only prevents removal of worker pods where the worker itself failed, +# not when the task it ran failed. +# +# Variable: AIRFLOW__KUBERNETES_EXECUTOR__DELETE_WORKER_PODS_ON_FAILURE +# +delete_worker_pods_on_failure = False + +worker_pod_pending_fatal_container_state_reasons = CreateContainerConfigError,ErrImagePull,CreateContainerError,ImageInspectError, InvalidImageName +# Number of Kubernetes Worker Pod creation calls per scheduler loop. +# Note that the current default of "1" will only launch a single pod +# per-heartbeat. It is HIGHLY recommended that users increase this +# number to match the tolerance of their kubernetes cluster for +# better performance. +# +# Variable: AIRFLOW__KUBERNETES_EXECUTOR__WORKER_PODS_CREATION_BATCH_SIZE +# +worker_pods_creation_batch_size = 1 + +# Allows users to launch pods in multiple namespaces. +# Will require creating a cluster-role for the scheduler, +# or use multi_namespace_mode_namespace_list configuration. +# +# Variable: AIRFLOW__KUBERNETES_EXECUTOR__MULTI_NAMESPACE_MODE +# +multi_namespace_mode = False + +# If multi_namespace_mode is True while scheduler does not have a cluster-role, +# give the list of namespaces where the scheduler will schedule jobs +# Scheduler needs to have the necessary permissions in these namespaces. +# +# Variable: AIRFLOW__KUBERNETES_EXECUTOR__MULTI_NAMESPACE_MODE_NAMESPACE_LIST +# +multi_namespace_mode_namespace_list = + +# Use the service account kubernetes gives to pods to connect to kubernetes cluster. +# It's intended for clients that expect to be running inside a pod running on kubernetes. +# It will raise an exception if called from a process not running in a kubernetes environment. +# +# Variable: AIRFLOW__KUBERNETES_EXECUTOR__IN_CLUSTER +# +in_cluster = True + +# When running with in_cluster=False change the default cluster_context or config_file +# options to Kubernetes client. Leave blank these to use default behaviour like ``kubectl`` has. +# +# Variable: AIRFLOW__KUBERNETES_EXECUTOR__CLUSTER_CONTEXT +# +# cluster_context = + +# Path to the kubernetes configfile to be used when ``in_cluster`` is set to False +# +# Variable: AIRFLOW__KUBERNETES_EXECUTOR__CONFIG_FILE +# +# config_file = + +# Keyword parameters to pass while calling a kubernetes client core_v1_api methods +# from Kubernetes Executor provided as a single line formatted JSON dictionary string. +# List of supported params are similar for all core_v1_apis, hence a single config +# variable for all apis. See: +# https://raw.githubusercontent.com/kubernetes-client/python/41f11a09995efcd0142e25946adc7591431bfb2f/kubernetes/client/api/core_v1_api.py +# +# Variable: AIRFLOW__KUBERNETES_EXECUTOR__KUBE_CLIENT_REQUEST_ARGS +# +kube_client_request_args = + +# Optional keyword arguments to pass to the ``delete_namespaced_pod`` kubernetes client +# ``core_v1_api`` method when using the Kubernetes Executor. +# This should be an object and can contain any of the options listed in the ``v1DeleteOptions`` +# class defined here: +# https://github.com/kubernetes-client/python/blob/41f11a09995efcd0142e25946adc7591431bfb2f/kubernetes/client/models/v1_delete_options.py#L19 +# +# Example: delete_option_kwargs = {"grace_period_seconds": 10} +# +# Variable: AIRFLOW__KUBERNETES_EXECUTOR__DELETE_OPTION_KWARGS +# +delete_option_kwargs = + +# Enables TCP keepalive mechanism. This prevents Kubernetes API requests to hang indefinitely +# when idle connection is time-outed on services like cloud load balancers or firewalls. +# +# Variable: AIRFLOW__KUBERNETES_EXECUTOR__ENABLE_TCP_KEEPALIVE +# +enable_tcp_keepalive = True + +# When the `enable_tcp_keepalive` option is enabled, TCP probes a connection that has +# been idle for `tcp_keep_idle` seconds. +# +# Variable: AIRFLOW__KUBERNETES_EXECUTOR__TCP_KEEP_IDLE +# +tcp_keep_idle = 120 + +# When the `enable_tcp_keepalive` option is enabled, if Kubernetes API does not respond +# to a keepalive probe, TCP retransmits the probe after `tcp_keep_intvl` seconds. +# +# Variable: AIRFLOW__KUBERNETES_EXECUTOR__TCP_KEEP_INTVL +# +tcp_keep_intvl = 30 + +# When the `enable_tcp_keepalive` option is enabled, if Kubernetes API does not respond +# to a keepalive probe, TCP retransmits the probe `tcp_keep_cnt number` of times before +# a connection is considered to be broken. +# +# Variable: AIRFLOW__KUBERNETES_EXECUTOR__TCP_KEEP_CNT +# +tcp_keep_cnt = 6 + +# Set this to false to skip verifying SSL certificate of Kubernetes python client. +# +# Variable: AIRFLOW__KUBERNETES_EXECUTOR__VERIFY_SSL +# +verify_ssl = True + +# How often in seconds to check for task instances stuck in "queued" status without a pod +# +# Variable: AIRFLOW__KUBERNETES_EXECUTOR__WORKER_PODS_QUEUED_CHECK_INTERVAL +# +worker_pods_queued_check_interval = 60 + +# Path to a CA certificate to be used by the Kubernetes client to verify the server's SSL certificate. +# +# Variable: AIRFLOW__KUBERNETES_EXECUTOR__SSL_CA_CERT +# +ssl_ca_cert = + +# The Maximum number of retries for queuing the task to the kubernetes scheduler when +# failing due to Kube API exceeded quota errors before giving up and marking task as failed. +# -1 for unlimited times. +# +# Variable: AIRFLOW__KUBERNETES_EXECUTOR__TASK_PUBLISH_MAX_RETRIES +# +task_publish_max_retries = 0 + [common.io] # Common IO configuration section @@ -2469,6 +2952,87 @@ xcom_objectstorage_threshold = -1 # xcom_objectstorage_compression = +[elasticsearch] +# Elasticsearch host +# +# Variable: AIRFLOW__ELASTICSEARCH__HOST +# +host = + +# Format of the log_id, which is used to query for a given tasks logs +# +# Variable: AIRFLOW__ELASTICSEARCH__LOG_ID_TEMPLATE +# +log_id_template = {dag_id}-{task_id}-{run_id}-{map_index}-{try_number} + +# Used to mark the end of a log stream for a task +# +# Variable: AIRFLOW__ELASTICSEARCH__END_OF_LOG_MARK +# +end_of_log_mark = end_of_log + +# Qualified URL for an elasticsearch frontend (like Kibana) with a template argument for log_id +# Code will construct log_id using the log_id template from the argument above. +# NOTE: scheme will default to https if one is not provided +# +# Example: frontend = http://localhost:5601/app/kibana#/discover?_a=(columns:!(message),query:(language:kuery,query:'log_id: "{log_id}"'),sort:!(log.offset,asc)) +# +# Variable: AIRFLOW__ELASTICSEARCH__FRONTEND +# +frontend = + +# Write the task logs to the stdout of the worker, rather than the default files +# +# Variable: AIRFLOW__ELASTICSEARCH__WRITE_STDOUT +# +write_stdout = False + +# Instead of the default log formatter, write the log lines as JSON +# +# Variable: AIRFLOW__ELASTICSEARCH__JSON_FORMAT +# +json_format = False + +# Log fields to also attach to the json output, if enabled +# +# Variable: AIRFLOW__ELASTICSEARCH__JSON_FIELDS +# +json_fields = asctime, filename, lineno, levelname, message + +# The field where host name is stored (normally either `host` or `host.name`) +# +# Variable: AIRFLOW__ELASTICSEARCH__HOST_FIELD +# +host_field = host + +# The field where offset is stored (normally either `offset` or `log.offset`) +# +# Variable: AIRFLOW__ELASTICSEARCH__OFFSET_FIELD +# +offset_field = offset + +# Comma separated list of index patterns to use when searching for logs (default: `_all`). +# The index_patterns_callable takes precedence over this. +# +# Example: index_patterns = something-* +# +# Variable: AIRFLOW__ELASTICSEARCH__INDEX_PATTERNS +# +index_patterns = _all + +index_patterns_callable = + +[elasticsearch_configs] +# +# Variable: AIRFLOW__ELASTICSEARCH_CONFIGS__HTTP_COMPRESS +# +http_compress = False + +# +# Variable: AIRFLOW__ELASTICSEARCH_CONFIGS__VERIFY_CERTS +# +verify_certs = True + [fab] # This section contains configs specific to FAB provider. @@ -2496,6 +3060,123 @@ update_fab_perms = True # ssl_context = +[azure_remote_logging] +# Configuration that needs to be set for enable remote logging in Azure Blob Storage + +remote_wasb_log_container = airflow-logs + +[openlineage] +# This section applies settings for OpenLineage integration. +# More about configuration and it's precedence can be found at +# https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/guides/user.html#transport-setup + +# Disable sending events without uninstalling the OpenLineage Provider by setting this to true. +# +# Variable: AIRFLOW__OPENLINEAGE__DISABLED +# +disabled = False + +# Exclude some Operators from emitting OpenLineage events by passing a string of semicolon separated +# full import paths of Operators to disable. +# +# Example: disabled_for_operators = airflow.providers.standard.operators.bash.BashOperator; airflow.providers.standard.operators.python.PythonOperator +# +# Variable: AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS +# +disabled_for_operators = + +# If this setting is enabled, OpenLineage integration won't collect and emit metadata, +# unless you explicitly enable it per `DAG` or `Task` using `enable_lineage` method. +# +# Variable: AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE +# +selective_enable = False + +# Set namespace that the lineage data belongs to, so that if you use multiple OpenLineage producers, +# events coming from them will be logically separated. +# +# Example: namespace = my_airflow_instance_1 +# +# Variable: AIRFLOW__OPENLINEAGE__NAMESPACE +# +# namespace = + +# Register custom OpenLineage Extractors by passing a string of semicolon separated full import paths. +# +# Example: extractors = full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass +# +# Variable: AIRFLOW__OPENLINEAGE__EXTRACTORS +# +# extractors = + +# Register custom run facet functions by passing a string of semicolon separated full import paths. +# +# Example: custom_run_facets = full.path.to.custom_facet_function;full.path.to.another_custom_facet_function +# +# Variable: AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS +# +custom_run_facets = + +# Specify the path to the YAML configuration file. +# This ensures backwards compatibility with passing config through the `openlineage.yml` file. +# +# Example: config_path = full/path/to/openlineage.yml +# +# Variable: AIRFLOW__OPENLINEAGE__CONFIG_PATH +# +config_path = + +# Pass OpenLineage Client transport configuration as JSON string. It should contain type of the +# transport and additional options (different for each transport type). For more details see: +# https://openlineage.io/docs/client/python/#built-in-transport-types +# +# Currently supported types are: +# +# * HTTP +# * Kafka +# * Console +# * File +# +# Example: transport = {"type": "http", "url": "http://localhost:5000", "endpoint": "api/v1/lineage"} +# +# Variable: AIRFLOW__OPENLINEAGE__TRANSPORT +# +transport = + +# Disable the inclusion of source code in OpenLineage events by setting this to `true`. +# By default, several Operators (e.g. Python, Bash) will include their source code in the events +# unless disabled. +# +# Variable: AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE +# +disable_source_code = False + +# Number of processes to utilize for processing DAG state changes +# in an asynchronous manner within the scheduler process. +# +# Variable: AIRFLOW__OPENLINEAGE__DAG_STATE_CHANGE_PROCESS_POOL_SIZE +# +dag_state_change_process_pool_size = 1 + +# Maximum amount of time (in seconds) that OpenLineage can spend executing metadata extraction. +# +# Variable: AIRFLOW__OPENLINEAGE__EXECUTION_TIMEOUT +# +execution_timeout = 10 + +# If true, OpenLineage event will include full task info - potentially containing large fields. +# +# Variable: AIRFLOW__OPENLINEAGE__INCLUDE_FULL_TASK_INFO +# +include_full_task_info = False + +# If true, OpenLineage events will include information useful for debugging - potentially +# containing large fields e.g. all installed packages and their versions. +# +# Variable: AIRFLOW__OPENLINEAGE__DEBUG_MODE +# +debug_mode = False + [smtp_provider] # Options for SMTP provider. diff --git a/airflow.db b/airflow/airflow.db similarity index 93% rename from airflow.db rename to airflow/airflow.db index 3c32d4229139f85be87e2ceee33a497f96fa02d0..8eeb1c7af263e7ddde55342aa0bc49fc43968beb 100644 Binary files a/airflow.db and b/airflow/airflow.db differ diff --git a/airflow/dags/__pycache__/welcome_dag.cpython-312.pyc b/airflow/dags/__pycache__/welcome_dag.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..6f135560f187375c54b9a6628250c0c5dacc4cdd Binary files /dev/null and b/airflow/dags/__pycache__/welcome_dag.cpython-312.pyc differ diff --git a/airflow/dags/welcome_dag.py b/airflow/dags/welcome_dag.py new file mode 100644 index 0000000000000000000000000000000000000000..7628de6b60e98fe593b037fa6f068a989681c63e --- /dev/null +++ b/airflow/dags/welcome_dag.py @@ -0,0 +1,87 @@ +from airflow import DAG + +from airflow.operators.python_operator import PythonOperator + +from airflow.utils.dates import days_ago + +from datetime import datetime + +import requests + + + +def print_welcome(): + + print('Welcome to Airflow!') + + + +def print_date(): + + print('Today is {}'.format(datetime.today().date())) + + + +def print_random_quote(): + + response = requests.get('https://api.quotable.io/random') + + quote = response.json()['content'] + + print('Quote of the day: "{}"'.format(quote)) + + + +dag = DAG( + + 'welcome_dag', + + default_args={'start_date': days_ago(1)}, + + schedule_interval='0 23 * * *', + + catchup=False + +) + + + +print_welcome_task = PythonOperator( + + task_id='print_welcome', + + python_callable=print_welcome, + + dag=dag + +) + + + +print_date_task = PythonOperator( + + task_id='print_date', + + python_callable=print_date, + + dag=dag + +) + + + +print_random_quote = PythonOperator( + + task_id='print_random_quote', + + python_callable=print_random_quote, + + dag=dag + +) + + + +# Set the dependencies between the tasks + +print_welcome_task >> print_date_task >> print_random_quote \ No newline at end of file diff --git a/airflow/standalone_admin_password.txt b/airflow/standalone_admin_password.txt new file mode 100644 index 0000000000000000000000000000000000000000..11c68bd43b69f4c7f263a472564d8f85933e0c58 --- /dev/null +++ b/airflow/standalone_admin_password.txt @@ -0,0 +1 @@ +SbyPAHK96d2ZCYR3 \ No newline at end of file diff --git a/airflow/webserver_config.py b/airflow/webserver_config.py new file mode 100644 index 0000000000000000000000000000000000000000..3048bb21f4d5960a518a461c8bad5cf1f1a3f020 --- /dev/null +++ b/airflow/webserver_config.py @@ -0,0 +1,132 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Default configuration for the Airflow webserver.""" + +from __future__ import annotations + +import os + +from flask_appbuilder.const import AUTH_DB + +# from airflow.www.fab_security.manager import AUTH_LDAP +# from airflow.www.fab_security.manager import AUTH_OAUTH +# from airflow.www.fab_security.manager import AUTH_OID +# from airflow.www.fab_security.manager import AUTH_REMOTE_USER + + +basedir = os.path.abspath(os.path.dirname(__file__)) + +# Flask-WTF flag for CSRF +WTF_CSRF_ENABLED = True +WTF_CSRF_TIME_LIMIT = None + +# ---------------------------------------------------- +# AUTHENTICATION CONFIG +# ---------------------------------------------------- +# For details on how to set up each of the following authentication, see +# http://flask-appbuilder.readthedocs.io/en/latest/security.html# authentication-methods +# for details. + +# The authentication type +# AUTH_OID : Is for OpenID +# AUTH_DB : Is for database +# AUTH_LDAP : Is for LDAP +# AUTH_REMOTE_USER : Is for using REMOTE_USER from web server +# AUTH_OAUTH : Is for OAuth +AUTH_TYPE = AUTH_DB + +# Uncomment to setup Full admin role name +# AUTH_ROLE_ADMIN = 'Admin' + +# Uncomment and set to desired role to enable access without authentication +# AUTH_ROLE_PUBLIC = 'Viewer' + +# Will allow user self registration +# AUTH_USER_REGISTRATION = True + +# The recaptcha it's automatically enabled for user self registration is active and the keys are necessary +# RECAPTCHA_PRIVATE_KEY = PRIVATE_KEY +# RECAPTCHA_PUBLIC_KEY = PUBLIC_KEY + +# Config for Flask-Mail necessary for user self registration +# MAIL_SERVER = 'smtp.gmail.com' +# MAIL_USE_TLS = True +# MAIL_USERNAME = 'yourappemail@gmail.com' +# MAIL_PASSWORD = 'passwordformail' +# MAIL_DEFAULT_SENDER = 'sender@gmail.com' + +# The default user self registration role +# AUTH_USER_REGISTRATION_ROLE = "Public" + +# When using OAuth Auth, uncomment to setup provider(s) info +# Google OAuth example: +# OAUTH_PROVIDERS = [{ +# 'name':'google', +# 'token_key':'access_token', +# 'icon':'fa-google', +# 'remote_app': { +# 'api_base_url':'https://www.googleapis.com/oauth2/v2/', +# 'client_kwargs':{ +# 'scope': 'email profile' +# }, +# 'access_token_url':'https://accounts.google.com/o/oauth2/token', +# 'authorize_url':'https://accounts.google.com/o/oauth2/auth', +# 'request_token_url': None, +# 'client_id': GOOGLE_KEY, +# 'client_secret': GOOGLE_SECRET_KEY, +# } +# }] + +# When using LDAP Auth, setup the ldap server +# AUTH_LDAP_SERVER = "ldap://ldapserver.new" + +# When using OpenID Auth, uncomment to setup OpenID providers. +# example for OpenID authentication +# OPENID_PROVIDERS = [ +# { 'name': 'Yahoo', 'url': 'https://me.yahoo.com' }, +# { 'name': 'AOL', 'url': 'http://openid.aol.com/<username>' }, +# { 'name': 'Flickr', 'url': 'http://www.flickr.com/<username>' }, +# { 'name': 'MyOpenID', 'url': 'https://www.myopenid.com' }] + +# ---------------------------------------------------- +# Theme CONFIG +# ---------------------------------------------------- +# Flask App Builder comes up with a number of predefined themes +# that you can use for Apache Airflow. +# http://flask-appbuilder.readthedocs.io/en/latest/customizing.html#changing-themes +# Please make sure to remove "navbar_color" configuration from airflow.cfg +# in order to fully utilize the theme. (or use that property in conjunction with theme) +# APP_THEME = "bootstrap-theme.css" # default bootstrap +# APP_THEME = "amelia.css" +# APP_THEME = "cerulean.css" +# APP_THEME = "cosmo.css" +# APP_THEME = "cyborg.css" +# APP_THEME = "darkly.css" +# APP_THEME = "flatly.css" +# APP_THEME = "journal.css" +# APP_THEME = "lumen.css" +# APP_THEME = "paper.css" +# APP_THEME = "readable.css" +# APP_THEME = "sandstone.css" +# APP_THEME = "simplex.css" +# APP_THEME = "slate.css" +# APP_THEME = "solar.css" +# APP_THEME = "spacelab.css" +# APP_THEME = "superhero.css" +# APP_THEME = "united.css" +# APP_THEME = "yeti.css" diff --git a/dags/example.py b/dags/example.py deleted file mode 100644 index 20597038c82982b22c0eb4b5dde9ecf2a846da22..0000000000000000000000000000000000000000 --- a/dags/example.py +++ /dev/null @@ -1,18 +0,0 @@ -from airflow.decorators import dag, task -from pendulum import datetime - -@dag( - start_date=datetime(2024, 1, 1), - schedule="@daily", - catchup=False, -) -def minimal_test_dag(): - - @task() - def say_hello(): - print("Hello") - return "Hello" - - say_hello() - -minimal_test_dag() \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000000000000000000000000000000000000..2b790a339a0f6d0e90af2326d1952e1c209ed409 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,12 @@ +version: '3' +services: + sleek-airflow: + build: + context: . + dockerfile: Dockerfile + volumes: + - ./airflow:/opt/airflow + ports: + - "8080:8080" + command: airflow standalone +