Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions airflow/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
FROM apache/airflow:2.10.5

USER root
RUN apt update && apt install git -y
USER airflow

RUN pip install --upgrade cffi \
&& pip install cryptography~=3.4 \
&& pip install dbt-core dbt-postgres

WORKDIR /dbt/
COPY ./dbt/.dbt .dbt
COPY ./dbt/dbt_project.yml dbt_project.yml

RUN mkdir old_manifest

USER root
RUN chown -R airflow:50000 /dbt/
RUN chmod -R o+w /dbt/
USER airflow
22 changes: 22 additions & 0 deletions airflow/dags/dbt_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'retries': 1,
}

dag = DAG(
'update_incremental_models',
default_args=default_args,
catchup=False
)

dbt_run = BashOperator(
task_id='dbt_run',
bash_command='cd /dbt && dbt run --profiles-dir .dbt --exclude config.materialized:view',
dag=dag,
)
138 changes: 138 additions & 0 deletions airflow/dags/install.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
from airflow import DAG
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime, timedelta
import os
import json
from urllib.parse import urlparse

def get_package():
"""Fetch package information and save it to a file."""
package_json = '{}'

if os.getenv("CHT_PIPELINE_BRANCH_URL"):
init_package = urlparse(os.getenv("CHT_PIPELINE_BRANCH_URL"))
if init_package.scheme in ["http", "https"]:
package_json = json.dumps({
"packages": [{
"git": init_package._replace(fragment='').geturl(),
"revision": init_package.fragment
}]
})

with open("/dbt/packages.yml", "w") as f:
f.write(package_json)

return package_json

def branch_on_manifest():
"""Branch based on whether an old manifest exists."""
hook = PostgresHook(postgres_conn_id="dbt_conn")
conn = hook.get_conn()
schema = os.getenv('POSTGRES_SCHEMA', 'public')

with conn.cursor() as cur:
cur.execute(f"SELECT manifest FROM {schema}._dataemon ORDER BY inserted_on DESC LIMIT 1")
manifest = cur.fetchone()

if manifest and manifest[0]:
with open("/dbt/old_manifest/manifest.json", "w") as f:
f.write(json.dumps(manifest[0]))
return "auto_update_models"

return "dbt_ls"

def get_new_manifest():
with open("/dbt/target/manifest.json", "r") as f:
new_manifest = f.read()
return new_manifest

def save_package_manifest(ti):
"""Save the package JSON and new manifest into the database."""
package_json = ti.xcom_pull(task_ids="get_package")
manifest_json = ti.xcom_pull(task_ids="get_new_manifest")

hook = PostgresHook(postgres_conn_id="dbt_conn")
conn = hook.get_conn()
schema = os.getenv('POSTGRES_SCHEMA', 'public')

with conn.cursor() as cur:
# Clear old data
cur.execute(f"DELETE FROM {schema}._dataemon")

# Insert new package & manifest
cur.execute(
f"INSERT INTO {schema}._dataemon (packages, manifest) VALUES (%s, %s)",
(package_json, manifest_json)
)
conn.commit()

# Define DAG
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2024, 2, 12),
"retries": 2,
"retry_delay": timedelta(minutes=5),
}

with DAG(
"dbt_setup_and_manifest",
default_args=default_args,
schedule_interval=None, # Run manually or trigger via Airflow UI/API
catchup=False,
) as dag:

get_package_task = PythonOperator(
task_id="get_package",
python_callable=get_package
)

dbt_deps = BashOperator(
task_id='dbt_deps',
bash_command='cd /dbt && dbt deps',
)

check_manifest_task = BranchPythonOperator(
task_id="check_manifest",
python_callable=branch_on_manifest
)

auto_update_models = BashOperator(
task_id='auto_update_models',
bash_command='cd /dbt && dbt run --profiles-dir .dbt --select state:modified --full-refresh --state ./old_manifest'
)

dbt_ls = BashOperator(
task_id='dbt_ls',
bash_command='cd /dbt && dbt ls --profiles-dir .dbt',
trigger_rule=TriggerRule.ONE_SUCCESS # Ensures it runs if one branch succeeds
)

get_new_manifest_task = PythonOperator(
task_id="get_new_manifest",
python_callable=get_new_manifest
)

save_task = PythonOperator(
task_id="save_package_manifest",
python_callable=save_package_manifest
)

# DAG Dependencies
get_package_task >> dbt_deps
dbt_deps >> check_manifest_task

# **Both branches must continue**
check_manifest_task >> auto_update_models
check_manifest_task >> dbt_ls

# **Merge branches before continuing**
auto_update_models >> dbt_ls

# **Ensure all tasks after merge run**
dbt_ls >> get_new_manifest_task >> save_task

48 changes: 48 additions & 0 deletions airflow/dags/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from airflow import DAG
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import os

def setup():
# Use Airflow's PostgresHook to get the connection
hook = PostgresHook(postgres_conn_id="dbt_conn")
conn = hook.get_conn()

schema = os.getenv('POSTGRES_SCHEMA', 'public') # Default to 'public' if not set

with conn:
with conn.cursor() as cur:
cur.execute(f"CREATE SCHEMA IF NOT EXISTS {schema};")
cur.execute(f"""
CREATE TABLE IF NOT EXISTS {schema}._dataemon (
inserted_on TIMESTAMP DEFAULT NOW(),
packages jsonb, manifest jsonb
)
""")
conn.commit()

# Define the Airflow DAG
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2024, 2, 12),
"retries": 2,
"retry_delay": timedelta(minutes=5),
}

with DAG(
"setup_postgres_schema",
default_args=default_args,
schedule_interval=None, # Run manually or trigger via Airflow UI/API
catchup=False,
) as dag:

setup_task = PythonOperator(
task_id="setup_database",
python_callable=setup
)

setup_task # Task execution order (only one task in this case)


13 changes: 13 additions & 0 deletions airflow/dbt/.dbt/profiles.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
default:
target: default
outputs:
default:
threads: "{{ env_var('DBT_THREAD_COUNT') | as_number }}"
type: postgres
host: "{{ env_var('POSTGRES_HOST') }}"
user: "{{ env_var('POSTGRES_USER') }}"
password: "{{ env_var('POSTGRES_PASSWORD') }}"
port: "{{ env_var('POSTGRES_PORT', '5432') | as_number }}"
dbname: "{{ env_var('POSTGRES_DB') }}"
schema: "{{ env_var('POSTGRES_SCHEMA') }}"

11 changes: 11 additions & 0 deletions airflow/dbt/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
name: 'dataemon'
version: '0.0.1'
config-version: 2
profile: 'default'

require-dbt-version: [">=1.0.0"]

target-path: "target"
clean-targets: ["target", "dbt_modules"]
macro-paths: ["macros"]
log-path: "logs"
Loading
Loading