diff --git a/.github/workflows/30_deploy_changes_to_production.yml b/.github/workflows/30_deploy_changes_to_production.yml index 49bd5585..7faab2c7 100644 --- a/.github/workflows/30_deploy_changes_to_production.yml +++ b/.github/workflows/30_deploy_changes_to_production.yml @@ -48,6 +48,13 @@ jobs: DBT_ARTIFACTS_BUCKET: "convexa-local" + # Needed for dbt-api + DATACOVES__API_ENDPOINT: ${{ vars.DATACOVES__API_ENDPOINT }} + DATACOVES__API_TOKEN: ${{ secrets.DATACOVES__API_TOKEN }} + DATACOVES__ACCOUNT_ID: ${{ vars.DATACOVES__ACCOUNT_ID }} + DATACOVES__PROJECT_SLUG: ${{ vars.DATACOVES__PROJECT_SLUG }} + DATACOVES__ENVIRONMENT_SLUG: ${{ vars.DATACOVES__ENVIRONMENT_SLUG }} + # This is used by datacoves to drop the staging database for blue/green # deployments, most likely you don't want to set this, we use it for demos DATACOVES__DROP_DB_ON_FAIL: ${{ vars.DATACOVES__DROP_DB_ON_FAIL }} @@ -112,6 +119,9 @@ jobs: - name: Upload dbt artifacts run: "dbt run-operation upload_artifacts" + - name: Push dbt artifacts to dbt-api on Datacoves + run: "../automate/dbt/push_dbt_artifacts.py" + - name: Configure AWS credentials uses: aws-actions/configure-aws-credentials@v4 with: diff --git a/automate/dbt/push_dbt_artifacts.py b/automate/dbt/push_dbt_artifacts.py new file mode 100755 index 00000000..1fc02fcc --- /dev/null +++ b/automate/dbt/push_dbt_artifacts.py @@ -0,0 +1,175 @@ +#!/usr/bin/env -S uv run +# /// script +# dependencies = [ +# "requests", +# "python-dotenv", +# "rich", +# ] +# /// + +import requests +import os +from dotenv import load_dotenv +import json +from rich import print_json +from rich.console import Console +from rich.table import Table + + +load_dotenv() +base_url = os.getenv("DATACOVES__API_ENDPOINT") +token = os.getenv("DATACOVES__API_TOKEN") +account_id = os.getenv("DATACOVES__ACCOUNT_ID") +project_slug = os.getenv("DATACOVES__PROJECT_SLUG") +environment_slug = os.getenv("DATACOVES__ENVIRONMENT_SLUG") +dbt_home = os.getenv("DATACOVES__DBT_HOME") + + +####################################### +# Utility for api interactions +####################################### +def print_responce(r): + print("STATUS:", r.status_code) + + response_text = r.text + + try: + parsed_json = json.loads(response_text) + print_json(data=parsed_json) + except json.JSONDecodeError: + print("RESPONSE:", response_text) + + print("-----------------------") + +def print_table(items, keys_to_show, title="Items"): + """Print a table showing only specified keys from a list of dictionaries""" + console = Console() + table = Table(title=title) + + # Define different colors for each column + colors = ["blue", "bright_green", "yellow", "green", "cyan", "magenta", "red", "bright_cyan", "bright_magenta", "bright_yellow"] + + # Add columns for each key we want to show with different colors + for index, key in enumerate(keys_to_show): + color = colors[index % len(colors)] # Cycle through colors if more columns than colors + table.add_column(key.replace('_', ' ').title(), style=color) + + # Add rows for each item in the list + for item in items: + row_values = [] + for key in keys_to_show: + value = item.get(key, "N/A") + row_values.append(str(value)) + table.add_row(*row_values) + + console.print(table) + +def get_endpoint(endpoint: str) -> str: + return f"{base_url}/{endpoint}" + +def get_headers() -> dict: + return { + "Accept": "application/json", + "Authorization": f"Bearer {token}" + } + +####################################### +# Get information +####################################### + +def health_check(): + print("Checking Health of api") + + r = requests.get( + url=get_endpoint(endpoint="/api/v3/healthcheck"), + headers=get_headers(), + ) + + print_responce(r) + +####################################### +# Working with files +####################################### + +def list_project_files(account_id: int, project_slug: str): + print(f"Listing files for project: {project_slug}") + + r = requests.get( + # url=get_endpoint(endpoint=f"/api/v3/datacoves/account/{account_id}/projects/{project_slug}/files"), + + url=get_endpoint(endpoint=f"api/v3/accounts/{account_id}/projects/{project_slug}/files"), + + headers=get_headers(), + ) + + return r.json().get("data", {}) + +def upload_env_file(account_id: int, project_slug: str, env_slug: str, + filename: str, is_manifest: bool = False, + dag_id: str = None, run_id: str = None, use_multipart: bool = False): + + print(f"Uploading file {filename} to project: {project_slug} in environment: {env_slug}") + + file = {"file": (filename, open(f"{dbt_home}/target/{filename}", "rb"))} + + data = { + 'filename': filename, + 'is_manifest': str(is_manifest).lower() + } + + r = requests.post( + url=get_endpoint(endpoint=f"api/v3/accounts/{account_id}/projects/{project_slug}/environments/{env_slug}/files"), + headers=get_headers(), + files=file, + data=data + ) + + print_responce(r) + +def promote_env_file(account_id: int, project_slug: str, env_slug: str, + filename: str): + + print(f"Promoting file {filename} in environment: {env_slug} to project level ({project_slug})") + + r = requests.post( + url=get_endpoint(endpoint=f"api/v3/accounts/{account_id}/projects/{project_slug}/environments/{env_slug}/files/{filename}/promote"), + headers=get_headers() + ) + + print_responce(r) + +def delete_project_file(account_id: int, project_slug: str, filename: str): + + print(f"Deleting file {filename} from project: {project_slug}") + + r = requests.delete( + url=get_endpoint(endpoint=f"api/v3/accounts/{account_id}/projects/{project_slug}/files/{filename}"), + headers=get_headers() + ) + + print_responce(r) + +if __name__ == "__main__": + # Get infomration + + health_check() + + cols = ["environment_slug",'filename', 'metadata', 'inserted_at'] + + # UPLOAD FILES + + filenames = ["graph.gpickle", "graph_summary.json", "partial_parse.msgpack"] + for filename in filenames: + upload_env_file(account_id, project_slug, environment_slug, filename) + + for filename in filenames: + promote_env_file(account_id, project_slug, environment_slug, filename) + + upload_env_file(account_id, project_slug, environment_slug, "manifest.json", is_manifest=True ) + promote_env_file(account_id, project_slug, environment_slug, "manifest.json" ) + + # delete_project_file(account_id, project_slug, "manifest.json") + + # SHOW FILE DETAILS + files = list_project_files(account_id, project_slug) + print_table(files, cols) diff --git a/orchestrate/dags/daily_loan_run.py b/orchestrate/dags/daily_loan_run.py index f4d0cfe5..9362f9b9 100644 --- a/orchestrate/dags/daily_loan_run.py +++ b/orchestrate/dags/daily_loan_run.py @@ -63,14 +63,13 @@ def extract_and_load_fivetran(): tooltip="dlt Extract and Load" ) def extract_and_load_dlt(): - @task.datacoves_bash + @task.datacoves_bash( + env = datacoves_utils.set_dlt_env_vars({"destinations": ["main_load_keypair"]}), + append_env=True + ) def load_loans_data(): - from orchestrate.utils import datacoves_utils - - env_vars = datacoves_utils.set_dlt_env_vars({"destinations": ["main_load_keypair"]}) - env_exports = datacoves_utils.generate_env_exports(env_vars) - return f"{env_exports}; cd load/dlt && ./loans_data.py" + return "cd load/dlt && ./loans_data.py" load_loans_data() diff --git a/orchestrate/dags/other_examples/load_dlt.py b/orchestrate/dags/other_examples/load_dlt.py index 67a02f2f..360ce414 100644 --- a/orchestrate/dags/other_examples/load_dlt.py +++ b/orchestrate/dags/other_examples/load_dlt.py @@ -22,14 +22,12 @@ ) def load_with_dlt(): - @task.datacoves_bash + @task.datacoves_bash( + env = datacoves_utils.set_dlt_env_vars({"destinations": ["main_load_keypair"]}), + append_env=True + ) def load_us_population(): - from orchestrate.utils import datacoves_utils - - env_vars = datacoves_utils.set_dlt_env_vars({"destinations": ["main_load_keypair"]}) - env_exports = datacoves_utils.generate_env_exports(env_vars) - - return f"{env_exports}; cd load/dlt && ./us_population.py" + return "cd load/dlt && ./us_population.py" load_us_population() diff --git a/orchestrate/dags/other_examples/load_earthquake_data.py b/orchestrate/dags/other_examples/load_earthquake_data.py index eb0ed819..03b61e26 100644 --- a/orchestrate/dags/other_examples/load_earthquake_data.py +++ b/orchestrate/dags/other_examples/load_earthquake_data.py @@ -36,32 +36,25 @@ def get_last_success_date(**context): return str(success_date - datetime.timedelta(days=3)) # Load earthquake data from USGS - @task.datacoves_bash + @task.datacoves_bash( + env=datacoves_utils.set_dlt_env_vars({'destinations': ['main_load_keypair']}), + append_env=True + ) def load_usgs_data(**context): - from orchestrate.utils import datacoves_utils - - # Get the start date directly from the upstream task + # Get the start date from the upstream task task_instance = context['task_instance'] - start_date = task_instance.xcom_pull(task_ids = 'get_last_success_date') - - # Set up environment variables - env_vars = datacoves_utils.set_dlt_env_vars({'destinations': ['main_load_keypair']}) - env_vars['DATACOVES__START_DATE'] = start_date - - env_exports = datacoves_utils.generate_env_exports(env_vars) + start_date = task_instance.xcom_pull(task_ids='get_last_success_date') - return f"{env_exports}; cd load/dlt && ./usgs_earthquake.py --start-date $DATACOVES__START_DATE" + # Pass the start date directly to the command + return f"cd load/dlt && ./usgs_earthquake.py --start-date {start_date}" # Load Country Polygon Data - @task.datacoves_bash + @task.datacoves_bash( + env=datacoves_utils.set_dlt_env_vars({'destinations': ['main_load_keypair']}), + append_env=True + ) def load_country_geography(): - from orchestrate.utils import datacoves_utils - - env_vars = datacoves_utils.set_dlt_env_vars({'destinations': ['main_load_keypair']}) - - env_exports = datacoves_utils.generate_env_exports(env_vars) - - return f"{env_exports}; cd load/dlt && ./country_geo.py" + return "cd load/dlt && ./country_geo.py" # Run the dbt transformations @task.datacoves_dbt( diff --git a/training_and_demos/dbt-api/dbt_api_files.py b/training_and_demos/dbt-api/dbt_api_files.py index c799faab..c6755bc0 100755 --- a/training_and_demos/dbt-api/dbt_api_files.py +++ b/training_and_demos/dbt-api/dbt_api_files.py @@ -13,7 +13,6 @@ from rich import print_json from rich.console import Console from rich.table import Table -from pathlib import Path load_dotenv() base_url = os.getenv("DATACOVES__API_ENDPOINT")