From c3945640567910e3e3ff286e14d02694ce218878 Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Mon, 27 Jan 2025 13:10:34 -0500 Subject: [PATCH 1/3] Check table set being accessed --- src/brad/front_end/vdbe/vdbe_front_end.py | 21 +++++++++++++++------ src/brad/vdbe/models.py | 7 ++++++- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/src/brad/front_end/vdbe/vdbe_front_end.py b/src/brad/front_end/vdbe/vdbe_front_end.py index c6fd66b6..6ccdb3e9 100644 --- a/src/brad/front_end/vdbe/vdbe_front_end.py +++ b/src/brad/front_end/vdbe/vdbe_front_end.py @@ -29,15 +29,16 @@ from brad.front_end.errors import QueryError from brad.front_end.session import SessionManager, SessionId from brad.front_end.watchdog import Watchdog +from brad.front_end.vdbe.vdbe_endpoint_manager import VdbeEndpointManager from brad.provisioning.directory import Directory from brad.row_list import RowList from brad.utils import log_verbose, create_custom_logger from brad.utils.rand_exponential_backoff import RandomizedExponentialBackoff from brad.utils.run_time_reservoir import RunTimeReservoir from brad.utils.time_periods import universal_now +from brad.query_rep import QueryRep from brad.vdbe.manager import VdbeFrontEndManager from brad.vdbe.models import VirtualInfrastructure -from brad.front_end.vdbe.vdbe_endpoint_manager import VdbeEndpointManager logger = logging.getLogger(__name__) @@ -247,9 +248,17 @@ async def _run_query_impl( # semicolon if it exists. # NOTE: BRAD does not yet support having multiple # semicolon-separated queries in one request. - query = self._clean_query_str(query) + query_rep = QueryRep(self._clean_query_str(query)) + + # Verify that the query is not accessing tables that are not part of + # the VDBE. + for table_name in query_rep.tables(): + if table_name not in vdbe.table_names_set: + raise QueryError( + f"Table '{table_name}' not found in VDBE '{vdbe.name}'", + is_transient=False, + ) - # TODO: Validate table accesses. engine_to_use = vdbe.mapped_to log_verbose( @@ -268,10 +277,10 @@ async def _run_query_impl( # HACK: To work around dialect differences between # Athena/Aurora/Redshift for now. This should be replaced by # a more robust translation layer. - if engine_to_use == Engine.Athena and "ascii" in query: - translated_query = query.replace("ascii", "codepoint") + if engine_to_use == Engine.Athena and "ascii" in query_rep.raw_query: + translated_query = query_rep.raw_query.replace("ascii", "codepoint") else: - translated_query = query + translated_query = query_rep.raw_query start = universal_now() await cursor.execute(translated_query) end = universal_now() diff --git a/src/brad/vdbe/models.py b/src/brad/vdbe/models.py index a74c7c0b..4ed0bb79 100644 --- a/src/brad/vdbe/models.py +++ b/src/brad/vdbe/models.py @@ -1,5 +1,6 @@ import enum -from typing import List, Optional +from functools import cached_property +from typing import List, Optional, Set from pydantic import BaseModel from brad.config.engine import Engine @@ -33,6 +34,10 @@ class VirtualEngine(BaseModel): mapped_to: Engine endpoint: Optional[str] = None + @cached_property + def table_names_set(self) -> Set[str]: + return {table.name for table in self.tables} + class VirtualInfrastructure(BaseModel): schema_name: str From aed8a11f761cd3728ef9e533460bc1b75817d54a Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Mon, 27 Jan 2025 13:22:21 -0500 Subject: [PATCH 2/3] Bug fix --- ui/src/components/PerfView.jsx | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/ui/src/components/PerfView.jsx b/ui/src/components/PerfView.jsx index fc6b9b0c..d5025349 100644 --- a/ui/src/components/PerfView.jsx +++ b/ui/src/components/PerfView.jsx @@ -62,10 +62,19 @@ function PerfView({ virtualInfra, showingPreview, showVdbeSpecificMetrics }) { windowSizeMinutes: 10, metrics: {}, }); - - if (displayMetricsData.windowSizeMinutes !== windowSizeMinutes) { - changeDisplayMetricsWindow(windowSizeMinutes); - } + const changeDisplayMetricsWindow = useCallback( + (windowSizeMinutes) => { + const metricsManager = getMetricsManager(); + setDisplayMetricsData({ + windowSizeMinutes, + metrics: metricsManager.getMetricsInWindow( + windowSizeMinutes, + /*extendForward=*/ true, + ), + }); + }, + [getMetricsManager, setDisplayMetricsData], + ); const refreshMetrics = useCallback(async () => { const rawMetrics = await fetchMetrics(60, /*useGenerated=*/ false); @@ -101,19 +110,9 @@ function PerfView({ virtualInfra, showingPreview, showVdbeSpecificMetrics }) { }; }, [refreshMetrics]); - const changeDisplayMetricsWindow = useCallback( - (windowSizeMinutes) => { - const metricsManager = getMetricsManager(); - setDisplayMetricsData({ - windowSizeMinutes, - metrics: metricsManager.getMetricsInWindow( - windowSizeMinutes, - /*extendForward=*/ true, - ), - }); - }, - [getMetricsManager, setDisplayMetricsData], - ); + if (displayMetricsData.windowSizeMinutes !== windowSizeMinutes) { + changeDisplayMetricsWindow(windowSizeMinutes); + } const columnStyle = { flexGrow: 2, From 7518ab56af2bf52ab8d1aea2e5ab535aa34828d9 Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Mon, 27 Jan 2025 13:45:47 -0500 Subject: [PATCH 3/3] Record metrics per VDBE --- src/brad/daemon/daemon.py | 6 ++-- src/brad/daemon/vdbe_metrics.py | 4 +++ src/brad/front_end/vdbe/vdbe_front_end.py | 38 +++++++++++++++-------- ui/src/components/OverallInfraView.jsx | 2 +- ui/src/components/VdbeView.jsx | 2 +- 5 files changed, 34 insertions(+), 18 deletions(-) diff --git a/src/brad/daemon/daemon.py b/src/brad/daemon/daemon.py index 75e918cb..df52c53c 100644 --- a/src/brad/daemon/daemon.py +++ b/src/brad/daemon/daemon.py @@ -465,9 +465,6 @@ async def _read_front_end_messages(self, front_end: "_FrontEndProcess") -> None: if isinstance(message, MetricsReport): self._monitor.handle_metric_report(message) - elif isinstance(message, VdbeMetricsReport): - self._monitor.handle_vdbe_metric_report(message) - elif isinstance(message, InternalCommandRequest): task = asyncio.create_task( self._run_internal_command_request_response(message) @@ -575,6 +572,9 @@ async def _read_vdbe_messages(self, vdbe_process: "_VdbeFrontEndProcess") -> Non ) vdbe_process.mailbox.on_new_message((None,)) + elif isinstance(message, VdbeMetricsReport): + self._monitor.handle_vdbe_metric_report(message) + else: logger.debug( "Received unexpected message from front end %d: %s", diff --git a/src/brad/daemon/vdbe_metrics.py b/src/brad/daemon/vdbe_metrics.py index 9535af88..88dede72 100644 --- a/src/brad/daemon/vdbe_metrics.py +++ b/src/brad/daemon/vdbe_metrics.py @@ -116,6 +116,10 @@ def _metrics_logger(self) -> Optional[MetricsLogger]: def handle_metric_report(self, report: VdbeMetricsReport) -> None: now = universal_now() + logger.debug("Handling VDBE metrics report: (ts: %s)", now) + for vdbe_id, sketch in report.query_latency_sketches(): + p90 = sketch.get_quantile_value(0.9) + logger.debug("Has sketch for VDBE %d. p90: %f", vdbe_id, p90) for vdbe_id, sketch in report.query_latency_sketches(): if vdbe_id not in self._sketch_front_end_metrics: diff --git a/src/brad/front_end/vdbe/vdbe_front_end.py b/src/brad/front_end/vdbe/vdbe_front_end.py index 6ccdb3e9..4aac2e98 100644 --- a/src/brad/front_end/vdbe/vdbe_front_end.py +++ b/src/brad/front_end/vdbe/vdbe_front_end.py @@ -95,7 +95,8 @@ def __init__( ) self._daemon_messages_task: Optional[asyncio.Task[None]] = None - self._reset_latency_sketches() + # Stored per VDBE. + self._query_latency_sketches: Dict[int, DDSketch] = {} self._brad_metrics_reporting_task: Optional[asyncio.Task[None]] = None # Used to re-establish engine connections. @@ -308,11 +309,14 @@ async def _run_query_impl( # Error when executing the query. raise QueryError.from_exception(ex, is_transient_error) - # Decide whether to log the query. + # Record the run time for later reporting. run_time_s = end - start run_time_s_float = run_time_s.total_seconds() - # TODO: Should be per VDBE. - self._query_latency_sketch.add(run_time_s_float) + try: + self._query_latency_sketches[vdbe_id].add(run_time_s_float) + except KeyError: + self._query_latency_sketches[vdbe_id] = self._get_empty_sketch() + self._query_latency_sketches[vdbe_id].add(run_time_s_float) # Extract and return the results, if any. try: @@ -440,17 +444,23 @@ async def _report_metrics_to_daemon(self) -> None: self._config.front_end_metrics_reporting_period_seconds ) + report_data = [] + for vdbe_id, sketch in self._query_latency_sketches.items(): + report_data.append((vdbe_id, sketch)) + query_p90 = sketch.get_quantile_value(0.9) + if query_p90 is not None: + logger.debug( + "VDBE %d Query latency p90 (s): %.4f", vdbe_id, query_p90 + ) + logger.info( + "Sending VDBE metrics report for %d VDBEs", len(report_data) + ) + # If the input queue is full, we just drop this message. metrics_report = VdbeMetricsReport.from_data( - self.NUMERIC_IDENTIFIER, - [(0, self._query_latency_sketch)], + self.NUMERIC_IDENTIFIER, report_data ) self._output_queue.put_nowait(metrics_report) - - query_p90 = self._query_latency_sketch.get_quantile_value(0.9) - if query_p90 is not None: - logger.debug("Query latency p90 (s): %.4f", query_p90) - self._reset_latency_sketches() except Exception as ex: @@ -560,9 +570,11 @@ async def _do_reestablish_connections(self) -> None: self._reestablish_connections_task = None def _reset_latency_sketches(self) -> None: - # TODO: Store per VDBE. + self._query_latency_sketches.clear() + + def _get_empty_sketch(self) -> DDSketch: sketch_rel_accuracy = 0.01 - self._query_latency_sketch = DDSketch(relative_accuracy=sketch_rel_accuracy) + return DDSketch(relative_accuracy=sketch_rel_accuracy) async def _orchestrate_shutdown(fe: BradVdbeFrontEnd) -> None: diff --git a/ui/src/components/OverallInfraView.jsx b/ui/src/components/OverallInfraView.jsx index 8deead28..96cd96a1 100644 --- a/ui/src/components/OverallInfraView.jsx +++ b/ui/src/components/OverallInfraView.jsx @@ -123,7 +123,7 @@ function OverallInfraView({ diff --git a/ui/src/components/VdbeView.jsx b/ui/src/components/VdbeView.jsx index 91b8c77c..351f117b 100644 --- a/ui/src/components/VdbeView.jsx +++ b/ui/src/components/VdbeView.jsx @@ -162,7 +162,7 @@ function VdbeView({