From 0848ea88b27e4b23ceb6f7977590989bf62aff25 Mon Sep 17 00:00:00 2001 From: ailepet Date: Tue, 8 Jul 2025 15:30:40 +0200 Subject: [PATCH 1/6] pull a list of all clients from Grist --- dags/data_utils/grist/grist_pull.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dags/data_utils/grist/grist_pull.py b/dags/data_utils/grist/grist_pull.py index c335874..3f8004e 100644 --- a/dags/data_utils/grist/grist_pull.py +++ b/dags/data_utils/grist/grist_pull.py @@ -8,17 +8,17 @@ connection = BaseHook.get_connection("grist_osp") grist_api_key = connection.password grist_server = connection.host -grist_ca_doc_id = Variable.get("grist_ca_doc_id") +grist_ca_doc_id = Variable.get("grist_all_clients_doc_id") # Get api key from your Profile Settings, and run with GRIST_API_KEY= api = GristDocAPI(grist_ca_doc_id, server=grist_server, api_key=grist_api_key) def fetch_and_dump_data(connection_name): - data = api.fetch_table('Suivi_CA_par_clients') + data = api.fetch_table('Liste_de_tous_les_clients') df = pd.DataFrame(data) - df['Prestations_2024'] = df['Prestations_2024'].astype(str) + df['Prestations_2025'] = df['Prestations_2025'].astype(str) # Add boolean columns for each field fields = ['Abo Decidim', 'Abo Grist', 'Abo Metabase', 'Bénévolat', @@ -27,11 +27,11 @@ def fetch_and_dump_data(connection_name): # Ajouter des colonnes booléennes pour chaque champ for field in fields: - df[field] = df['Prestations_2024'].apply(lambda x: field in x) + df[field] = df['Prestations_2025'].apply(lambda x: field in x) engine = get_postgres_connection(connection_name, "aggregated_client_data") connection = engine.connect() - table_name = "grist_test_ca" + table_name = "all_clients" drop_table_in_postgres(connection, table_name) dump_data_to_postgres(connection, df, table_name) From 5fcb8ef118071ef1e010895a06c767c68087dd13 Mon Sep 17 00:00:00 2001 From: ailepet Date: Tue, 8 Jul 2025 16:47:27 +0200 Subject: [PATCH 2/6] restore former grist task --- dags/data_utils/grist/grist_pull.py | 12 +++--- .../grist/grist_pull_all_clients.py | 39 +++++++++++++++++++ 2 files changed, 45 insertions(+), 6 deletions(-) create mode 100644 dags/data_utils/grist/grist_pull_all_clients.py diff --git a/dags/data_utils/grist/grist_pull.py b/dags/data_utils/grist/grist_pull.py index 3f8004e..3fabf42 100644 --- a/dags/data_utils/grist/grist_pull.py +++ b/dags/data_utils/grist/grist_pull.py @@ -8,17 +8,17 @@ connection = BaseHook.get_connection("grist_osp") grist_api_key = connection.password grist_server = connection.host -grist_ca_doc_id = Variable.get("grist_all_clients_doc_id") +grist_ca_doc_id = Variable.get("grist_ca_doc_id") # Get api key from your Profile Settings, and run with GRIST_API_KEY= api = GristDocAPI(grist_ca_doc_id, server=grist_server, api_key=grist_api_key) def fetch_and_dump_data(connection_name): - data = api.fetch_table('Liste_de_tous_les_clients') + data = api.fetch_table('Suivi_CA_par_clients') df = pd.DataFrame(data) - df['Prestations_2025'] = df['Prestations_2025'].astype(str) + df['Prestations_2024'] = df['Prestations_2024'].astype(str) # Add boolean columns for each field fields = ['Abo Decidim', 'Abo Grist', 'Abo Metabase', 'Bénévolat', @@ -27,13 +27,13 @@ def fetch_and_dump_data(connection_name): # Ajouter des colonnes booléennes pour chaque champ for field in fields: - df[field] = df['Prestations_2025'].apply(lambda x: field in x) + df[field] = df['Prestations_2024'].apply(lambda x: field in x) engine = get_postgres_connection(connection_name, "aggregated_client_data") connection = engine.connect() - table_name = "all_clients" + table_name = "grist_test_ca" drop_table_in_postgres(connection, table_name) dump_data_to_postgres(connection, df, table_name) - connection.close() + connection.close() \ No newline at end of file diff --git a/dags/data_utils/grist/grist_pull_all_clients.py b/dags/data_utils/grist/grist_pull_all_clients.py new file mode 100644 index 0000000..3f8004e --- /dev/null +++ b/dags/data_utils/grist/grist_pull_all_clients.py @@ -0,0 +1,39 @@ +from airflow.hooks.base import BaseHook +from grist_api import GristDocAPI +import pandas as pd +from ..postgres_helper.postgres_helper import dump_data_to_postgres, get_postgres_connection, drop_table_in_postgres +from airflow.models import Variable + +# Retrieve the connection object using Airflow's BaseHook +connection = BaseHook.get_connection("grist_osp") +grist_api_key = connection.password +grist_server = connection.host +grist_ca_doc_id = Variable.get("grist_all_clients_doc_id") + +# Get api key from your Profile Settings, and run with GRIST_API_KEY= +api = GristDocAPI(grist_ca_doc_id, server=grist_server, api_key=grist_api_key) + + +def fetch_and_dump_data(connection_name): + data = api.fetch_table('Liste_de_tous_les_clients') + df = pd.DataFrame(data) + + df['Prestations_2025'] = df['Prestations_2025'].astype(str) + + # Add boolean columns for each field + fields = ['Abo Decidim', 'Abo Grist', 'Abo Metabase', 'Bénévolat', + 'Conseil Decidim', 'Conseil Grist', 'Conseil Metabase', + 'Synthèses', 'Technique Decidim', 'Technique Metabase', 'Terminé'] + + # Ajouter des colonnes booléennes pour chaque champ + for field in fields: + df[field] = df['Prestations_2025'].apply(lambda x: field in x) + + engine = get_postgres_connection(connection_name, "aggregated_client_data") + connection = engine.connect() + table_name = "all_clients" + + drop_table_in_postgres(connection, table_name) + dump_data_to_postgres(connection, df, table_name) + + connection.close() From a41cde32a938baa243f4bbf0df7e94cac42c1376 Mon Sep 17 00:00:00 2001 From: ailepet Date: Thu, 10 Jul 2025 10:44:45 +0200 Subject: [PATCH 3/6] add new DAG for aggregatin --- .../grist/grist_pull_all_clients.py | 4 +-- dags/grist_dump_client_data.py | 25 +++++++++++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) create mode 100644 dags/grist_dump_client_data.py diff --git a/dags/data_utils/grist/grist_pull_all_clients.py b/dags/data_utils/grist/grist_pull_all_clients.py index 3f8004e..e0b252c 100644 --- a/dags/data_utils/grist/grist_pull_all_clients.py +++ b/dags/data_utils/grist/grist_pull_all_clients.py @@ -8,14 +8,14 @@ connection = BaseHook.get_connection("grist_osp") grist_api_key = connection.password grist_server = connection.host -grist_ca_doc_id = Variable.get("grist_all_clients_doc_id") +grist_ca_doc_id = Variable.get("grist_ca_doc_id") # Get api key from your Profile Settings, and run with GRIST_API_KEY= api = GristDocAPI(grist_ca_doc_id, server=grist_server, api_key=grist_api_key) def fetch_and_dump_data(connection_name): - data = api.fetch_table('Liste_de_tous_les_clients') + data = api.fetch_table('Liste_clients_memoire') df = pd.DataFrame(data) df['Prestations_2025'] = df['Prestations_2025'].astype(str) diff --git a/dags/grist_dump_client_data.py b/dags/grist_dump_client_data.py new file mode 100644 index 0000000..36f12bf --- /dev/null +++ b/dags/grist_dump_client_data.py @@ -0,0 +1,25 @@ +import pendulum +from airflow import DAG +from airflow.operators.python import PythonOperator +from data_utils.alerting.alerting import task_failed +from data_utils.grist.grist_pull import fetch_and_dump_data + +connection_name="main_db_cluster_name" + +with DAG( + dag_id='grist_pull_all_clients', + default_args={'owner': 'airflow'}, + schedule=None, + start_date=pendulum.datetime(2024, 11, 15, tz="UTC"), + catchup=True +) as dag: + + fetch_grist_data = PythonOperator( + task_id='fetch_and_dump_grist_data', + python_callable=fetch_and_dump_data, + op_args=[f"{connection_name}"], + dag=dag, + on_failure_callback=task_failed, + ) + + fetch_grist_data \ No newline at end of file From 2f6c458a0160bd0224c0e15218b8f70fb1064b0b Mon Sep 17 00:00:00 2001 From: ailepet Date: Thu, 10 Jul 2025 16:36:24 +0200 Subject: [PATCH 4/6] add task in existing DAG --- dags/crossclient_aggregation.py | 12 +++++++++++- dags/grist_dump_client_data.py | 25 ------------------------- 2 files changed, 11 insertions(+), 26 deletions(-) delete mode 100644 dags/grist_dump_client_data.py diff --git a/dags/crossclient_aggregation.py b/dags/crossclient_aggregation.py index fdb9fac..082fd4e 100644 --- a/dags/crossclient_aggregation.py +++ b/dags/crossclient_aggregation.py @@ -4,8 +4,10 @@ from data_utils.alerting.alerting import task_failed from clients import clients from data_utils.crossclient_aggregation.crossclient_pull import create_aggregated_tables +from data_utils.grist.grist_pull_all_clients import fetch_and_dump_data import logging +connection_name="main_db_cluster_name" queries = { "all_users": """SELECT id AS decidim_user_id, email, date_of_birth, gender, created_at, sign_in_count, current_sign_in_at, confirmed, managed, admin, deleted_at, blocked, spam, spam_reported_at, spam_probability FROM prod.all_users""", @@ -67,6 +69,13 @@ op_args=[queries, clients], dag=dag, on_failure_callback=task_failed, + + fetch_grist_data = PythonOperator( + task_id='fetch_and_dump_grist_data', + python_callable=fetch_and_dump_data, + op_args=[f"{connection_name}"], + dag=dag, + on_failure_callback=task_failed, ) logger = logging.getLogger(__name__) @@ -74,4 +83,5 @@ logger.warn(f":DEBUG: crossclient_aggregation> Queries : {queries}") logger.warn(f":DEBUG: crossclient_aggregation> Clients : {clients}") aggregate_crossclient_data - logger.warn(f":DEBUG: crossclient_aggregation> DAG terminated.") + fetch_grist_data + logger.warn(f":DEBUG: crossclient_aggregation> DAG terminated.") \ No newline at end of file diff --git a/dags/grist_dump_client_data.py b/dags/grist_dump_client_data.py deleted file mode 100644 index 36f12bf..0000000 --- a/dags/grist_dump_client_data.py +++ /dev/null @@ -1,25 +0,0 @@ -import pendulum -from airflow import DAG -from airflow.operators.python import PythonOperator -from data_utils.alerting.alerting import task_failed -from data_utils.grist.grist_pull import fetch_and_dump_data - -connection_name="main_db_cluster_name" - -with DAG( - dag_id='grist_pull_all_clients', - default_args={'owner': 'airflow'}, - schedule=None, - start_date=pendulum.datetime(2024, 11, 15, tz="UTC"), - catchup=True -) as dag: - - fetch_grist_data = PythonOperator( - task_id='fetch_and_dump_grist_data', - python_callable=fetch_and_dump_data, - op_args=[f"{connection_name}"], - dag=dag, - on_failure_callback=task_failed, - ) - - fetch_grist_data \ No newline at end of file From b29baada361ac3862090625618caf5a242d6b880 Mon Sep 17 00:00:00 2001 From: ailepet Date: Thu, 10 Jul 2025 16:47:49 +0200 Subject: [PATCH 5/6] fix typo --- dags/crossclient_aggregation.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dags/crossclient_aggregation.py b/dags/crossclient_aggregation.py index 082fd4e..02b4bf7 100644 --- a/dags/crossclient_aggregation.py +++ b/dags/crossclient_aggregation.py @@ -69,8 +69,9 @@ op_args=[queries, clients], dag=dag, on_failure_callback=task_failed, + ) - fetch_grist_data = PythonOperator( + fetch_grist_data = PythonOperator( task_id='fetch_and_dump_grist_data', python_callable=fetch_and_dump_data, op_args=[f"{connection_name}"], From 52700bbb5d10fd68628a333b57ad2e5c03348bc7 Mon Sep 17 00:00:00 2001 From: ailepet Date: Thu, 10 Jul 2025 17:12:03 +0200 Subject: [PATCH 6/6] set catchup to false --- dags/crossclient_aggregation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/crossclient_aggregation.py b/dags/crossclient_aggregation.py index 02b4bf7..6ccb08f 100644 --- a/dags/crossclient_aggregation.py +++ b/dags/crossclient_aggregation.py @@ -60,7 +60,7 @@ default_args={'owner': 'airflow'}, schedule='45 21 * * *', start_date=pendulum.datetime(2025, 6, 17, tz="UTC"), - catchup=True + catchup=False ) as dag: aggregate_crossclient_data = PythonOperator(