From 186c9d229f62b9e1e3ed42fb152a27063b79979d Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Mon, 23 Jun 2025 05:58:00 -0400 Subject: [PATCH 1/6] Add script to set up scenario 3 blueprint --- experiments/19-demo2/README.md | 2 + experiments/19-demo2/set_up_etl_blueprint.py | 146 +++++++++++++++++++ 2 files changed, 148 insertions(+) create mode 100644 experiments/19-demo2/set_up_etl_blueprint.py diff --git a/experiments/19-demo2/README.md b/experiments/19-demo2/README.md index 3980128d..cc5ea19c 100644 --- a/experiments/19-demo2/README.md +++ b/experiments/19-demo2/README.md @@ -25,6 +25,8 @@ external ETL scenario. Make sure to start Redshift and Aurora. 2. The web interface should be accessible on port 7783. +Use `set_up_etl_blueprint.py` to transition to the blueprint for this scenario. + ## Important files - `config/system_config_demo_s{1,2,3}.yml`: diff --git a/experiments/19-demo2/set_up_etl_blueprint.py b/experiments/19-demo2/set_up_etl_blueprint.py new file mode 100644 index 00000000..4e78a8bf --- /dev/null +++ b/experiments/19-demo2/set_up_etl_blueprint.py @@ -0,0 +1,146 @@ +import asyncio +import argparse +import logging + +from brad.asset_manager import AssetManager +from brad.blueprint import Blueprint +from brad.blueprint.manager import BlueprintManager +from brad.blueprint.provisioning import Provisioning +from brad.config.engine import Engine +from brad.config.file import ConfigFile +from brad.daemon.transition_orchestrator import TransitionOrchestrator +from brad.planner.enumeration.blueprint import EnumeratedBlueprint +from brad.query_rep import QueryRep +from brad.routing.abstract_policy import FullRoutingPolicy +from brad.routing.cached import CachedLocationPolicy +from brad.routing.policy import RoutingPolicy +from brad.routing.tree_based.forest_policy import ForestPolicy +from brad.utils import set_up_logging + +logger = logging.getLogger(__name__) + + +async def run_transition( + config: ConfigFile, + blueprint_mgr: BlueprintManager, + next_blueprint: Blueprint, +) -> None: + logger.info("Starting the transition...") + assert next_blueprint is not None + await blueprint_mgr.start_transition(next_blueprint, new_score=None) + orchestrator = TransitionOrchestrator(config, blueprint_mgr) + logger.info("Running the transition...") + await orchestrator.run_prepare_then_transition() + logger.info("Running the post-transition clean up...") + await orchestrator.run_clean_up_after_transition() + logger.info("Done!") + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument( + "--system-config-file", + type=str, + required=True, + help="Path to BRAD's system configuration file.", + ) + parser.add_argument( + "--physical-config-file", + type=str, + required=True, + help="Path to BRAD's physical configuration file.", + ) + parser.add_argument( + "--schema-name", + type=str, + required=True, + help="The name of the schema to drop.", + ) + parser.add_argument("--query-bank-file", type=str) + parser.add_argument( + "--athena-queries", type=str, help="Comma separated list of indices." + ) + parser.add_argument( + "--aurora-queries", + type=str, + help="Comma separated list of indices.", + default="99,56,32,92,91,49,30,83,94,38,87,86,76,37,31,46", + ) + parser.add_argument( + "--redshift-queries", type=str, help="Comma separated list of indices." + ) + args = parser.parse_args() + set_up_logging(debug_mode=True) + + # 1. Load the config. + config = ConfigFile.load_from_new_configs( + phys_config=args.physical_config_file, system_config=args.system_config_file + ) + + # 2. Load the existing blueprint. + assets = AssetManager(config) + blueprint_mgr = BlueprintManager(config, assets, args.schema_name) + blueprint_mgr.load_sync() + blueprint = blueprint_mgr.get_blueprint() + + # 3. Load the query bank. + queries = [] + with open(args.query_bank_file, "r", encoding="UTF-8") as file: + for line in file: + clean = line.strip() + if clean.endswith(";"): + clean = clean[:-1] + queries.append(QueryRep(clean)) + + # 4. Create the fixed routing policy. + query_map = {} + if args.athena_queries is not None: + for qidx_str in args.athena_queries.split(","): + qidx = int(qidx_str.strip()) + query_map[queries[qidx]] = Engine.Athena + if args.redshift_queries is not None: + for qidx_str in args.redshift_queries.split(","): + qidx = int(qidx_str.strip()) + query_map[queries[qidx]] = Engine.Redshift + if args.aurora_queries is not None: + for qidx_str in args.aurora_queries.split(","): + qidx = int(qidx_str.strip()) + query_map[queries[qidx]] = Engine.Aurora + clp = CachedLocationPolicy(query_map) + + # 5. Replace the policy. + enum_blueprint = EnumeratedBlueprint(blueprint) + definite_policy = asyncio.run( + ForestPolicy.from_assets( + args.schema_name, RoutingPolicy.ForestTableCardinality, assets + ) + ) + replaced_policy = FullRoutingPolicy( + indefinite_policies=[clp], definite_policy=definite_policy + ) + enum_blueprint.set_routing_policy(replaced_policy) + + # Ensure the provisioning is as expected. + enum_blueprint.set_aurora_provisioning(Provisioning("db.t4g.medium", 2)) + enum_blueprint.set_redshift_provisioning(Provisioning("dc2.large", 2)) + + # 6. Adjust the placement. + new_placement = {} + aurora_only = ["theatres", "showings", "ticket_orders"] + joint = ["aka_title", "movie_info"] + for table in blueprint.tables(): + if table.name in aurora_only: + new_placement[table.name] = [Engine.Aurora] + elif table.name in joint: + new_placement[table.name] = [Engine.Aurora, Engine.Redshift] + else: + new_placement[table.name] = [Engine.Redshift] + enum_blueprint.set_table_locations(new_placement) + + # 6. Transition to the new blueprint. + modified_blueprint = enum_blueprint.to_blueprint() + asyncio.run(run_transition(config, blueprint_mgr, modified_blueprint)) + + +if __name__ == "__main__": + main() From 8c80f4887b24812c58373a2f769bc7684865b868 Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Mon, 23 Jun 2025 07:30:22 -0400 Subject: [PATCH 2/6] Add JDBC notes, more logging --- cpp/server/brad_server_simple.cc | 2 ++ experiments/19-demo2/README.md | 5 +++++ 2 files changed, 7 insertions(+) diff --git a/cpp/server/brad_server_simple.cc b/cpp/server/brad_server_simple.cc index 2821b11a..4fac9371 100644 --- a/cpp/server/brad_server_simple.cc +++ b/cpp/server/brad_server_simple.cc @@ -334,6 +334,7 @@ BradFlightSqlServer::GetFlightInfoImpl(const std::string& query, { py::gil_scoped_acquire guard; auto result = handle_query_(query); + std::cerr << "Got result for query from Python: " << query << std::endl; result_schema = ArrowSchemaFromBradSchema(result.second); result_record_batch = ResultToRecordBatch(result.first, result_schema).ValueOrDie(); @@ -343,6 +344,7 @@ BradFlightSqlServer::GetFlightInfoImpl(const std::string& query, auto statement, BradStatement::Create(std::move(result_record_batch), result_schema)); query_data_.insert(query_ticket, statement); + std::cerr << "Stored Arrow result for query: " << query << std::endl; std::vector endpoints{ FlightEndpoint{std::move(ticket), {}, std::nullopt, ""}}; diff --git a/experiments/19-demo2/README.md b/experiments/19-demo2/README.md index cc5ea19c..61bf6636 100644 --- a/experiments/19-demo2/README.md +++ b/experiments/19-demo2/README.md @@ -27,6 +27,11 @@ Use `set_up_etl_blueprint.py` to transition to the blueprint for this scenario. +#### VDBE JDBC connections + +- Connection URL: `jdbc:arrow-flight-sql://{VDBE host and port}/?&useEncryption=false` +- JDBC driver class name: `org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver` + ## Important files - `config/system_config_demo_s{1,2,3}.yml`: From 4bacbc14689a3c7548037651cbd471fececd086a Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Mon, 23 Jun 2025 07:46:27 -0400 Subject: [PATCH 3/6] Add extra tables --- experiments/19-demo2/set_up_etl_blueprint.py | 38 ++++++++++++++++++++ src/brad/planner/enumeration/blueprint.py | 4 +++ 2 files changed, 42 insertions(+) diff --git a/experiments/19-demo2/set_up_etl_blueprint.py b/experiments/19-demo2/set_up_etl_blueprint.py index 4e78a8bf..384ab3f8 100644 --- a/experiments/19-demo2/set_up_etl_blueprint.py +++ b/experiments/19-demo2/set_up_etl_blueprint.py @@ -6,6 +6,7 @@ from brad.blueprint import Blueprint from brad.blueprint.manager import BlueprintManager from brad.blueprint.provisioning import Provisioning +from brad.blueprint.table import Table, Column from brad.config.engine import Engine from brad.config.file import ConfigFile from brad.daemon.transition_orchestrator import TransitionOrchestrator @@ -124,6 +125,40 @@ def main(): enum_blueprint.set_aurora_provisioning(Provisioning("db.t4g.medium", 2)) enum_blueprint.set_redshift_provisioning(Provisioning("dc2.large", 2)) + etl_orders_table = Table( + "ticket_orders_subset", + columns=[ + Column("id", "INT"), + Column("showing_id", "BIGINT"), + Column("quantity", "INT"), + ], + table_dependencies=[], + transform_text=None, + secondary_indexed_columns=[], + ) + etl_agg_table = Table( + "ticket_orders_agg", + columns=[ + Column("showing_id", "BIGINT"), + Column("total_quantity", "INT"), + ], + table_dependencies=[], + transform_text=None, + secondary_indexed_columns=[], + ) + + # 5. Add the ETL tables if they do not already exist. + try: + enum_blueprint.get_table(etl_orders_table.name) + except ValueError: + print("Adding", etl_orders_table.name, "to the blueprint.") + enum_blueprint.add_table(etl_orders_table) + try: + enum_blueprint.get_table(etl_agg_table.name) + except ValueError: + print("Adding", etl_agg_table.name, "to the blueprint.") + enum_blueprint.add_table(etl_agg_table) + # 6. Adjust the placement. new_placement = {} aurora_only = ["theatres", "showings", "ticket_orders"] @@ -135,6 +170,9 @@ def main(): new_placement[table.name] = [Engine.Aurora, Engine.Redshift] else: new_placement[table.name] = [Engine.Redshift] + + new_placement[etl_orders_table.name] = [Engine.Aurora] + new_placement[etl_agg_table.name] = [Engine.Redshift] enum_blueprint.set_table_locations(new_placement) # 6. Transition to the new blueprint. diff --git a/src/brad/planner/enumeration/blueprint.py b/src/brad/planner/enumeration/blueprint.py index f166f29d..90897a81 100644 --- a/src/brad/planner/enumeration/blueprint.py +++ b/src/brad/planner/enumeration/blueprint.py @@ -2,6 +2,7 @@ from brad.blueprint.blueprint import Blueprint from brad.blueprint.provisioning import Provisioning +from brad.blueprint.table import Table from brad.config.engine import Engine from brad.routing.abstract_policy import FullRoutingPolicy @@ -51,6 +52,9 @@ def set_routing_policy( self._current_routing_policy = routing_policy return self + def add_table(self, table: Table) -> None: + self._table_schemas.append(table) + def to_blueprint(self, forced_schema_name: Optional[str] = None) -> Blueprint: """ Makes a copy of this object as a `Blueprint`. From 015a2b1c1e179d5ddc96636d5d2cf976532211df Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Mon, 23 Jun 2025 07:51:25 -0400 Subject: [PATCH 4/6] Fixes --- experiments/19-demo2/set_up_etl_blueprint.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/experiments/19-demo2/set_up_etl_blueprint.py b/experiments/19-demo2/set_up_etl_blueprint.py index 384ab3f8..4428879a 100644 --- a/experiments/19-demo2/set_up_etl_blueprint.py +++ b/experiments/19-demo2/set_up_etl_blueprint.py @@ -128,9 +128,9 @@ def main(): etl_orders_table = Table( "ticket_orders_subset", columns=[ - Column("id", "INT"), - Column("showing_id", "BIGINT"), - Column("quantity", "INT"), + Column("id", "INT", is_primary=True), + Column("showing_id", "BIGINT", is_primary=False), + Column("quantity", "INT", is_primary=False), ], table_dependencies=[], transform_text=None, @@ -139,8 +139,8 @@ def main(): etl_agg_table = Table( "ticket_orders_agg", columns=[ - Column("showing_id", "BIGINT"), - Column("total_quantity", "INT"), + Column("showing_id", "BIGINT", is_primary=True), + Column("total_quantity", "INT", is_primary=False), ], table_dependencies=[], transform_text=None, From a8acdcb0c0aa48e4d3ab139589d876ed482f9643 Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Mon, 23 Jun 2025 07:54:24 -0400 Subject: [PATCH 5/6] Additional --- config/vdbe_demo/imdb_etl_vdbes.json | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/config/vdbe_demo/imdb_etl_vdbes.json b/config/vdbe_demo/imdb_etl_vdbes.json index 3ebeb864..6da5d3dd 100644 --- a/config/vdbe_demo/imdb_etl_vdbes.json +++ b/config/vdbe_demo/imdb_etl_vdbes.json @@ -27,6 +27,10 @@ { "name": "aka_title", "writable": true + }, + { + "name": "ticket_orders_subset", + "writable": true } ], "mapped_to": "aurora", @@ -138,6 +142,10 @@ { "name": "person_info", "writable": false + }, + { + "name": "ticket_orders_agg", + "writable": false } ], "mapped_to": "redshift", @@ -219,6 +227,12 @@ }, { "name": "person_info" + }, + { + "name": "ticket_orders_subset" + }, + { + "name": "ticket_orders_agg" } ] } From f980f3cb4cf277aa71a55fb8dea88eafb16fed1f Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Mon, 23 Jun 2025 18:23:02 -0400 Subject: [PATCH 6/6] Remove debug statements --- cpp/server/brad_server_simple.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/cpp/server/brad_server_simple.cc b/cpp/server/brad_server_simple.cc index 4fac9371..2821b11a 100644 --- a/cpp/server/brad_server_simple.cc +++ b/cpp/server/brad_server_simple.cc @@ -334,7 +334,6 @@ BradFlightSqlServer::GetFlightInfoImpl(const std::string& query, { py::gil_scoped_acquire guard; auto result = handle_query_(query); - std::cerr << "Got result for query from Python: " << query << std::endl; result_schema = ArrowSchemaFromBradSchema(result.second); result_record_batch = ResultToRecordBatch(result.first, result_schema).ValueOrDie(); @@ -344,7 +343,6 @@ BradFlightSqlServer::GetFlightInfoImpl(const std::string& query, auto statement, BradStatement::Create(std::move(result_record_batch), result_schema)); query_data_.insert(query_ticket, statement); - std::cerr << "Stored Arrow result for query: " << query << std::endl; std::vector endpoints{ FlightEndpoint{std::move(ticket), {}, std::nullopt, ""}};