From 57c0500d7c0df32ee0d57f4a0a47e8d912993a30 Mon Sep 17 00:00:00 2001 From: fujistone Date: Fri, 17 Jan 2025 17:52:34 +0100 Subject: [PATCH 1/3] feat(add) : adding dbt orchestration dag --- dags/dbt_orchestration.py | 43 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 dags/dbt_orchestration.py diff --git a/dags/dbt_orchestration.py b/dags/dbt_orchestration.py new file mode 100644 index 0000000..280cfb1 --- /dev/null +++ b/dags/dbt_orchestration.py @@ -0,0 +1,43 @@ +from client_list import clients +from airflow.decorators import dag, task +from airflow.models import Variable +from data_utils.github_helper import get_github_token, trigger_workflow +from data_utils.alerting.alerting import task_failed +import pendulum + +# Common variables +github_token = get_github_token() +env = Variable.get("environment") +default_args = {"owner": "airflow", "start_date": pendulum.datetime(2024, 11, 11, tz="UTC")} + + +def create_dbt_orchestration_dag(client_name): + @dag( + dag_id=f"dbt_orchestration_{client_name}", # Ensure unique dag_id + default_args=default_args, + schedule=None, + catchup=False, + on_failure_callback=task_failed + ) + def dbt_orchestration(): + + # Define GitHub action trigger + @task(task_id=f'trigger_github_action_{client_name}') + def trigger_github_action(): + """ + Trigger a GitHub action for the client. + """ + trigger_workflow(f'{client_name}_models_run_{env}.yml', token=github_token) + + + # Trigger GitHub action + github_action = trigger_github_action() + + github_action + + return dbt_orchestration() + + +# Dynamically generate DAGs for all clients +for client in clients: + globals()[f"dbt_orchestration_{client}"] = create_dbt_orchestration_dag(client_name=client) From 61d14c499e5f014e50d6b73e2cdc4f5fb0ad6a90 Mon Sep 17 00:00:00 2001 From: fujistone Date: Fri, 17 Jan 2025 17:56:46 +0100 Subject: [PATCH 2/3] feat(add): adding meta dbt ocherstration DAG --- dags/meta_dbt_orchestration.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 dags/meta_dbt_orchestration.py diff --git a/dags/meta_dbt_orchestration.py b/dags/meta_dbt_orchestration.py new file mode 100644 index 0000000..127481f --- /dev/null +++ b/dags/meta_dbt_orchestration.py @@ -0,0 +1,18 @@ +import pendulum + +from client_list import clients +from data_utils.dags_utils.orchestration_utils import create_orchestration_dag + +default_args = { + 'owner': 'airflow', + 'retries': 1, +} + +dag_id = 'meta_dbt_orchestration' +description = 'Orchestrator DAG for managing client GA DAGs sequentially' +schedule_interval = None # Trigger manually +start_date = pendulum.datetime(2024, 11, 11, tz="UTC") + +dbt_orchestration_dags = clients_prefixed = [f"dbt_orchestration_{client}" for client in clients] + +dag = create_orchestration_dag(dag_id, description, schedule_interval, start_date, dbt_orchestration_dags) From fa6b767a67cc06dbd1c2d2930db62a2def063233 Mon Sep 17 00:00:00 2001 From: Jean-Louis Lamezec Date: Tue, 21 Jan 2025 15:07:24 +0100 Subject: [PATCH 3/3] feat(dbt): add env variable to activate the dbt dags --- dags/dbt_orchestration.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/dags/dbt_orchestration.py b/dags/dbt_orchestration.py index 280cfb1..5c217c6 100644 --- a/dags/dbt_orchestration.py +++ b/dags/dbt_orchestration.py @@ -38,6 +38,9 @@ def trigger_github_action(): return dbt_orchestration() -# Dynamically generate DAGs for all clients -for client in clients: - globals()[f"dbt_orchestration_{client}"] = create_dbt_orchestration_dag(client_name=client) +enabled = Variable.get("dbt_orchestration_enabled") + +if enabled == "True": + # Dynamically generate DAGs for all clients + for client in clients: + globals()[f"dbt_orchestration_{client}"] = create_dbt_orchestration_dag(client_name=client)