diff --git a/dags/data_utils/doc_helpscout.py b/dags/data_utils/doc_helpscout.py index 43c1d9f..9a96a42 100644 --- a/dags/data_utils/doc_helpscout.py +++ b/dags/data_utils/doc_helpscout.py @@ -4,7 +4,7 @@ from airflow.models import Variable from requests.auth import HTTPBasicAuth -from data_utils.grist.grist_helper import osp_grist_api +from data_utils.grist.grist_helper import get_grist_api connection_helpscout = BaseHook.get_connection("helpscout") assert connection_helpscout.login is not None @@ -44,7 +44,7 @@ def dump_helpscout_collection_to_grist(collection_id): ) final_table["published"] = final_table["status"] == "published" - api = osp_grist_api(grist_doc_id) + api = get_grist_api("grist_osp", grist_doc_id) api.sync_table( "Articles", diff --git a/dags/data_utils/grist/_old_pull_ca.py b/dags/data_utils/grist/_old_pull_ca.py index bba1c8d..d75fcce 100644 --- a/dags/data_utils/grist/_old_pull_ca.py +++ b/dags/data_utils/grist/_old_pull_ca.py @@ -1,16 +1,18 @@ +from airflow.models import Variable + from ..postgres_helper import ( dump_data_to_postgres, get_postgres_connection, ) -from airflow.models import Variable -from .grist_helper import fetch_grist_table_data, _get_grist_api +from .grist_helper import fetch_grist_table_data, get_grist_api + grist_ca_doc_id = Variable.get("grist_ca_doc_id") def fetch_and_dump_data(connection_name): # Fetch data from Grist using the new utility function - api = _get_grist_api("grist_osp", grist_ca_doc_id) + api = get_grist_api("grist_osp", grist_ca_doc_id) df = fetch_grist_table_data(api, "Suivi_CA_par_clients") df["Prestations_2024"] = df["Prestations_2024"].astype(str) diff --git a/dags/data_utils/grist/demo_suite_keycloak.py b/dags/data_utils/grist/demo_suite_keycloak.py index fa0abf1..a774077 100644 --- a/dags/data_utils/grist/demo_suite_keycloak.py +++ b/dags/data_utils/grist/demo_suite_keycloak.py @@ -1,12 +1,14 @@ -from keycloak import KeycloakOpenIDConnection, KeycloakAdmin +from types import SimpleNamespace + +import numpy as np +import pandas as pd +import requests from airflow.hooks.base import BaseHook from airflow.models import Variable -import pandas as pd -from types import SimpleNamespace -from .grist_helper import _get_grist_api from grist_api import GristDocAPI -import requests -import numpy as np +from keycloak import KeycloakAdmin, KeycloakOpenIDConnection + +from .grist_helper import get_grist_api # Retrieve the connection object using Airflow's BaseHook grist_commercial_doc_id = Variable.get("grist_commercial_doc_id") @@ -205,9 +207,9 @@ def fetch_existing_grist_prospects(api: GristDocAPI, table_name: str) -> pd.Data def fetch_data_from_keycloak_and_dump_to_grist(): - api = _get_grist_api("grist_osp", grist_commercial_doc_id) + api = get_grist_api("grist_osp", grist_commercial_doc_id) existing_df = fetch_existing_grist_prospects(api, grist_table_name) - + if keycloak_server_url is None: raise ValueError("`keycloak_demo_suite` missing server URL") keycloak_username = keycloak_connection.login diff --git a/dags/data_utils/grist/grist_helper.py b/dags/data_utils/grist/grist_helper.py index 1a0d711..675bc8d 100644 --- a/dags/data_utils/grist/grist_helper.py +++ b/dags/data_utils/grist/grist_helper.py @@ -1,14 +1,14 @@ from __future__ import annotations -from typing import Literal, List +import re +from typing import List, Literal import pandas as pd from airflow.hooks.base import BaseHook from grist_api import GristDocAPI -import re -def _get_grist_api(connection_name, doc_id): +def get_grist_api(connection_name, doc_id): connection = BaseHook.get_connection(connection_name) grist_api_key = connection.password grist_server = connection.host @@ -37,7 +37,7 @@ def sanitize_identifier(name: str) -> str: def fetch_grist_table_data( - doc_api, table_name, errors: Literal["raise", "coerce"] = "coerce" + doc_api: GristDocAPI, table_name, errors: Literal["raise", "coerce"] = "coerce" ): """ Fetch data from a Grist table and return it as a pandas DataFrame with type validation. diff --git a/dags/data_utils/grist/suivi_ca.py b/dags/data_utils/grist/suivi_ca.py index be7041a..b4ed757 100644 --- a/dags/data_utils/grist/suivi_ca.py +++ b/dags/data_utils/grist/suivi_ca.py @@ -1,19 +1,20 @@ -from .grist_helper import _get_grist_api -import pandas as pd +from airflow.models import Variable + from ..postgres_helper import ( dump_data_to_postgres, get_postgres_connection, ) -from airflow.models import Variable +from .grist_helper import fetch_grist_table_data, get_grist_api # Retrieve the connection object using Airflow's BaseHook grist_ca_doc_id = Variable.get("grist_suivi_ca_doc_id") +if not isinstance(grist_ca_doc_id, str): + raise ValueError("grist_suivi_ca_doc_id variable not set") +api = get_grist_api("grist_osp", grist_ca_doc_id) def fetch_and_dump_data(connection_name): - api = _get_grist_api("grist_osp", grist_ca_doc_id) - data = api.fetch_table("SUIVI_CLIENTS") - df = pd.DataFrame(data) + df = fetch_grist_table_data(api, "SUIVI_CLIENTS") engine = get_postgres_connection(connection_name, "aggregated_client_data") connection = engine.connect()