diff --git a/airflow/Dockerfile b/airflow/Dockerfile new file mode 100644 index 0000000..0b52f94 --- /dev/null +++ b/airflow/Dockerfile @@ -0,0 +1,20 @@ +FROM apache/airflow:2.10.5 + +USER root +RUN apt update && apt install git -y +USER airflow + +RUN pip install --upgrade cffi \ + && pip install cryptography~=3.4 \ + && pip install dbt-core dbt-postgres + +WORKDIR /dbt/ +COPY ./dbt/.dbt .dbt +COPY ./dbt/dbt_project.yml dbt_project.yml + +RUN mkdir old_manifest + +USER root +RUN chown -R airflow:50000 /dbt/ +RUN chmod -R o+w /dbt/ +USER airflow diff --git a/airflow/dags/dbt_run.py b/airflow/dags/dbt_run.py new file mode 100644 index 0000000..d8c0719 --- /dev/null +++ b/airflow/dags/dbt_run.py @@ -0,0 +1,22 @@ +from airflow import DAG +from airflow.operators.bash import BashOperator +from datetime import datetime + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime(2024, 1, 1), + 'retries': 1, +} + +dag = DAG( + 'update_incremental_models', + default_args=default_args, + catchup=False +) + +dbt_run = BashOperator( + task_id='dbt_run', + bash_command='cd /dbt && dbt run --profiles-dir .dbt --exclude config.materialized:view', + dag=dag, +) diff --git a/airflow/dags/install.py b/airflow/dags/install.py new file mode 100644 index 0000000..ec143cd --- /dev/null +++ b/airflow/dags/install.py @@ -0,0 +1,138 @@ +from airflow import DAG +from airflow.providers.postgres.hooks.postgres import PostgresHook +from airflow.operators.python import PythonOperator, BranchPythonOperator +from airflow.operators.bash import BashOperator +from airflow.operators.dummy import DummyOperator +from airflow.utils.trigger_rule import TriggerRule +from datetime import datetime, timedelta +import os +import json +from urllib.parse import urlparse + +def get_package(): + """Fetch package information and save it to a file.""" + package_json = '{}' + + if os.getenv("CHT_PIPELINE_BRANCH_URL"): + init_package = urlparse(os.getenv("CHT_PIPELINE_BRANCH_URL")) + if init_package.scheme in ["http", "https"]: + package_json = json.dumps({ + "packages": [{ + "git": init_package._replace(fragment='').geturl(), + "revision": init_package.fragment + }] + }) + + with open("/dbt/packages.yml", "w") as f: + f.write(package_json) + + return package_json + +def branch_on_manifest(): + """Branch based on whether an old manifest exists.""" + hook = PostgresHook(postgres_conn_id="dbt_conn") + conn = hook.get_conn() + schema = os.getenv('POSTGRES_SCHEMA', 'public') + + with conn.cursor() as cur: + cur.execute(f"SELECT manifest FROM {schema}._dataemon ORDER BY inserted_on DESC LIMIT 1") + manifest = cur.fetchone() + + if manifest and manifest[0]: + with open("/dbt/old_manifest/manifest.json", "w") as f: + f.write(json.dumps(manifest[0])) + return "auto_update_models" + + return "dbt_ls" + +def get_new_manifest(): + with open("/dbt/target/manifest.json", "r") as f: + new_manifest = f.read() + return new_manifest + +def save_package_manifest(ti): + """Save the package JSON and new manifest into the database.""" + package_json = ti.xcom_pull(task_ids="get_package") + manifest_json = ti.xcom_pull(task_ids="get_new_manifest") + + hook = PostgresHook(postgres_conn_id="dbt_conn") + conn = hook.get_conn() + schema = os.getenv('POSTGRES_SCHEMA', 'public') + + with conn.cursor() as cur: + # Clear old data + cur.execute(f"DELETE FROM {schema}._dataemon") + + # Insert new package & manifest + cur.execute( + f"INSERT INTO {schema}._dataemon (packages, manifest) VALUES (%s, %s)", + (package_json, manifest_json) + ) + conn.commit() + +# Define DAG +default_args = { + "owner": "airflow", + "depends_on_past": False, + "start_date": datetime(2024, 2, 12), + "retries": 2, + "retry_delay": timedelta(minutes=5), +} + +with DAG( + "dbt_setup_and_manifest", + default_args=default_args, + schedule_interval=None, # Run manually or trigger via Airflow UI/API + catchup=False, +) as dag: + + get_package_task = PythonOperator( + task_id="get_package", + python_callable=get_package + ) + + dbt_deps = BashOperator( + task_id='dbt_deps', + bash_command='cd /dbt && dbt deps', + ) + + check_manifest_task = BranchPythonOperator( + task_id="check_manifest", + python_callable=branch_on_manifest + ) + + auto_update_models = BashOperator( + task_id='auto_update_models', + bash_command='cd /dbt && dbt run --profiles-dir .dbt --select state:modified --full-refresh --state ./old_manifest' + ) + + dbt_ls = BashOperator( + task_id='dbt_ls', + bash_command='cd /dbt && dbt ls --profiles-dir .dbt', + trigger_rule=TriggerRule.ONE_SUCCESS # Ensures it runs if one branch succeeds + ) + + get_new_manifest_task = PythonOperator( + task_id="get_new_manifest", + python_callable=get_new_manifest + ) + + save_task = PythonOperator( + task_id="save_package_manifest", + python_callable=save_package_manifest + ) + + # DAG Dependencies + get_package_task >> dbt_deps + dbt_deps >> check_manifest_task + + # **Both branches must continue** + check_manifest_task >> auto_update_models + check_manifest_task >> dbt_ls + + # **Merge branches before continuing** + auto_update_models >> dbt_ls + + # **Ensure all tasks after merge run** + dbt_ls >> get_new_manifest_task >> save_task + diff --git a/airflow/dags/setup.py b/airflow/dags/setup.py new file mode 100644 index 0000000..a106518 --- /dev/null +++ b/airflow/dags/setup.py @@ -0,0 +1,48 @@ +from airflow import DAG +from airflow.providers.postgres.hooks.postgres import PostgresHook +from airflow.operators.python import PythonOperator +from datetime import datetime, timedelta +import os + +def setup(): + # Use Airflow's PostgresHook to get the connection + hook = PostgresHook(postgres_conn_id="dbt_conn") + conn = hook.get_conn() + + schema = os.getenv('POSTGRES_SCHEMA', 'public') # Default to 'public' if not set + + with conn: + with conn.cursor() as cur: + cur.execute(f"CREATE SCHEMA IF NOT EXISTS {schema};") + cur.execute(f""" + CREATE TABLE IF NOT EXISTS {schema}._dataemon ( + inserted_on TIMESTAMP DEFAULT NOW(), + packages jsonb, manifest jsonb + ) + """) + conn.commit() + +# Define the Airflow DAG +default_args = { + "owner": "airflow", + "depends_on_past": False, + "start_date": datetime(2024, 2, 12), + "retries": 2, + "retry_delay": timedelta(minutes=5), +} + +with DAG( + "setup_postgres_schema", + default_args=default_args, + schedule_interval=None, # Run manually or trigger via Airflow UI/API + catchup=False, +) as dag: + + setup_task = PythonOperator( + task_id="setup_database", + python_callable=setup + ) + + setup_task # Task execution order (only one task in this case) + + diff --git a/airflow/dbt/.dbt/profiles.yml b/airflow/dbt/.dbt/profiles.yml new file mode 100644 index 0000000..cd05c18 --- /dev/null +++ b/airflow/dbt/.dbt/profiles.yml @@ -0,0 +1,13 @@ +default: + target: default + outputs: + default: + threads: "{{ env_var('DBT_THREAD_COUNT') | as_number }}" + type: postgres + host: "{{ env_var('POSTGRES_HOST') }}" + user: "{{ env_var('POSTGRES_USER') }}" + password: "{{ env_var('POSTGRES_PASSWORD') }}" + port: "{{ env_var('POSTGRES_PORT', '5432') | as_number }}" + dbname: "{{ env_var('POSTGRES_DB') }}" + schema: "{{ env_var('POSTGRES_SCHEMA') }}" + diff --git a/airflow/dbt/dbt_project.yml b/airflow/dbt/dbt_project.yml new file mode 100644 index 0000000..2d7493d --- /dev/null +++ b/airflow/dbt/dbt_project.yml @@ -0,0 +1,11 @@ +name: 'dataemon' +version: '0.0.1' +config-version: 2 +profile: 'default' + +require-dbt-version: [">=1.0.0"] + +target-path: "target" +clean-targets: ["target", "dbt_modules"] +macro-paths: ["macros"] +log-path: "logs" diff --git a/airflow/docker-compose.yml b/airflow/docker-compose.yml new file mode 100644 index 0000000..52d9ccf --- /dev/null +++ b/airflow/docker-compose.yml @@ -0,0 +1,321 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL. +# +# WARNING: This configuration is for local development. Do not use it in a production deployment. +# +# This configuration supports basic configuration using environment variables or an .env file +# The following variables are supported: +# +# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow. +# Default: apache/airflow:2.10.5 +# AIRFLOW_UID - User ID in Airflow containers +# Default: 50000 +# AIRFLOW_PROJ_DIR - Base path to which all the files will be volumed. +# Default: . +# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode +# +# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested). +# Default: airflow +# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account (if requested). +# Default: airflow +# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers. +# Use this option ONLY for quick checks. Installing requirements at container +# startup is done EVERY TIME the service is started. +# A better way is to build a custom image or extend the official image +# as described in https://airflow.apache.org/docs/docker-stack/build.html. +# Default: '' +# +# Feel free to modify this file to suit your needs. +--- +x-airflow-common: + &airflow-common + # In order to add custom dependencies or upgrade provider packages you can use your extended image. + # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml + # and uncomment the "build" line below, Then run `docker-compose build` to build the images. + # image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.10.5} + build: . + environment: + &airflow-common-env + AIRFLOW__CORE__EXECUTOR: CeleryExecutor + AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow + AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow + AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 + AIRFLOW__CORE__FERNET_KEY: '' + AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' + AIRFLOW__CORE__LOAD_EXAMPLES: 'false' + AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session' + # yamllint disable rule:line-length + # Use simple http server on scheduler for health checks + # See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server + # yamllint enable rule:line-length + AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true' + # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks + # for other purpose (development, test and especially production usage) build/extend Airflow image. + _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} + # The following line can be used to set a custom config file, stored in the local config folder + # If you want to use it, outcomment it and replace airflow.cfg with the name of your config file + # AIRFLOW_CONFIG: '/opt/airflow/config/airflow.cfg' + POSTGRES_HOST: ${POSTGRES_HOST} + POSTGRES_PORT: ${POSTGRES_PORT:-5432} + POSTGRES_USER: ${POSTGRES_USER} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} + POSTGRES_DB: ${POSTGRES_DB} + POSTGRES_TABLE: ${POSTGRES_TABLE} + POSTGRES_SCHEMA: ${POSTGRES_SCHEMA} + DBT_THREAD_COUNT: ${DBT_THREAD_COUNT} + CHT_PIPELINE_BRANCH_URL: ${CHT_PIPELINE_BRANCH_URL} + volumes: + - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags + - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs + - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config + - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins + user: "${AIRFLOW_UID:-50000}:0" + depends_on: + &airflow-common-depends-on + redis: + condition: service_healthy + postgres: + condition: service_healthy + +services: + postgres: + image: postgres:13 + environment: + POSTGRES_USER: airflow + POSTGRES_PASSWORD: airflow + POSTGRES_DB: airflow + volumes: + - postgres-db-volume:/var/lib/postgresql/data + healthcheck: + test: ["CMD", "pg_isready", "-U", "airflow"] + interval: 10s + retries: 5 + start_period: 5s + restart: always + + redis: + # Redis is limited to 7.2-bookworm due to licencing change + # https://redis.io/blog/redis-adopts-dual-source-available-licensing/ + image: redis:7.2-bookworm + expose: + - 6379 + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 30s + retries: 50 + start_period: 30s + restart: always + + airflow-webserver: + <<: *airflow-common + command: webserver + ports: + - "8080:8080" + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-scheduler: + <<: *airflow-common + command: scheduler + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:8974/health"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-worker: + <<: *airflow-common + command: celery worker + healthcheck: + # yamllint disable rule:line-length + test: + - "CMD-SHELL" + - 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + environment: + <<: *airflow-common-env + # Required to handle warm shutdown of the celery workers properly + # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation + DUMB_INIT_SETSID: "0" + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-triggerer: + <<: *airflow-common + command: triggerer + healthcheck: + test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"'] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-init: + <<: *airflow-common + entrypoint: /bin/bash + # yamllint disable rule:line-length + command: + - -c + - | + if [[ -z "${AIRFLOW_UID}" ]]; then + echo + echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m" + echo "If you are on Linux, you SHOULD follow the instructions below to set " + echo "AIRFLOW_UID environment variable, otherwise files will be owned by root." + echo "For other operating systems you can get rid of the warning with manually created .env file:" + echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user" + echo + fi + one_meg=1048576 + mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg)) + cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat) + disk_available=$$(df / | tail -1 | awk '{print $$4}') + warning_resources="false" + if (( mem_available < 4000 )) ; then + echo + echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m" + echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))" + echo + warning_resources="true" + fi + if (( cpus_available < 2 )); then + echo + echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m" + echo "At least 2 CPUs recommended. You have $${cpus_available}" + echo + warning_resources="true" + fi + if (( disk_available < one_meg * 10 )); then + echo + echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m" + echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))" + echo + warning_resources="true" + fi + if [[ $${warning_resources} == "true" ]]; then + echo + echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m" + echo "Please follow the instructions to increase amount of resources available:" + echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin" + echo + fi + mkdir -p /sources/logs /sources/dags /sources/plugins + chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins} + exec /entrypoint airflow version + # yamllint enable rule:line-length + environment: + <<: *airflow-common-env + _AIRFLOW_DB_MIGRATE: 'true' + _AIRFLOW_WWW_USER_CREATE: 'true' + _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} + _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow} + _PIP_ADDITIONAL_REQUIREMENTS: '' + user: "0:0" + volumes: + - ${AIRFLOW_PROJ_DIR:-.}:/sources + + airflow-cli: + <<: *airflow-common + profiles: + - debug + environment: + <<: *airflow-common-env + CONNECTION_CHECK_MAX_COUNT: "0" + # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252 + command: + - bash + - -c + - airflow + + # You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up + # or by explicitly targeted on the command line e.g. docker-compose up flower. + # See: https://docs.docker.com/compose/profiles/ + flower: + <<: *airflow-common + command: celery flower + profiles: + - flower + ports: + - "5555:5555" + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:5555/"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + couch2pg: + build: ../couch2pg/ + extra_hosts: + - "host.docker.internal:host-gateway" + logging: + driver: "json-file" + options: + max-size: "512m" + environment: + - COUCHDB_USER=${COUCHDB_USER} + - COUCHDB_PASSWORD=${COUCHDB_PASSWORD} + - COUCHDB_HOST=${COUCHDB_HOST} + - COUCHDB_DBS=${COUCHDB_DBS} + - COUCHDB_PORT=${COUCHDB_PORT} + - COUCHDB_SECURE=${COUCHDB_SECURE:-true} + - POSTGRES_USER=${POSTGRES_USER} + - POSTGRES_PASSWORD=${POSTGRES_PASSWORD} + - POSTGRES_HOST=${POSTGRES_HOST} + - POSTGRES_DB=${POSTGRES_DB} + - POSTGRES_PORT=${POSTGRES_PORT:-5432} + - POSTGRES_SCHEMA=${POSTGRES_SCHEMA} + - POSTGRES_TABLE=${POSTGRES_TABLE} + restart: always + +volumes: + postgres-db-volume: