diff --git a/src/brad/daemon/daemon.py b/src/brad/daemon/daemon.py
index 75e918cb..df52c53c 100644
--- a/src/brad/daemon/daemon.py
+++ b/src/brad/daemon/daemon.py
@@ -465,9 +465,6 @@ async def _read_front_end_messages(self, front_end: "_FrontEndProcess") -> None:
if isinstance(message, MetricsReport):
self._monitor.handle_metric_report(message)
- elif isinstance(message, VdbeMetricsReport):
- self._monitor.handle_vdbe_metric_report(message)
-
elif isinstance(message, InternalCommandRequest):
task = asyncio.create_task(
self._run_internal_command_request_response(message)
@@ -575,6 +572,9 @@ async def _read_vdbe_messages(self, vdbe_process: "_VdbeFrontEndProcess") -> Non
)
vdbe_process.mailbox.on_new_message((None,))
+ elif isinstance(message, VdbeMetricsReport):
+ self._monitor.handle_vdbe_metric_report(message)
+
else:
logger.debug(
"Received unexpected message from front end %d: %s",
diff --git a/src/brad/daemon/vdbe_metrics.py b/src/brad/daemon/vdbe_metrics.py
index 9535af88..88dede72 100644
--- a/src/brad/daemon/vdbe_metrics.py
+++ b/src/brad/daemon/vdbe_metrics.py
@@ -116,6 +116,10 @@ def _metrics_logger(self) -> Optional[MetricsLogger]:
def handle_metric_report(self, report: VdbeMetricsReport) -> None:
now = universal_now()
+ logger.debug("Handling VDBE metrics report: (ts: %s)", now)
+ for vdbe_id, sketch in report.query_latency_sketches():
+ p90 = sketch.get_quantile_value(0.9)
+ logger.debug("Has sketch for VDBE %d. p90: %f", vdbe_id, p90)
for vdbe_id, sketch in report.query_latency_sketches():
if vdbe_id not in self._sketch_front_end_metrics:
diff --git a/src/brad/front_end/vdbe/vdbe_front_end.py b/src/brad/front_end/vdbe/vdbe_front_end.py
index c6fd66b6..4aac2e98 100644
--- a/src/brad/front_end/vdbe/vdbe_front_end.py
+++ b/src/brad/front_end/vdbe/vdbe_front_end.py
@@ -29,15 +29,16 @@
from brad.front_end.errors import QueryError
from brad.front_end.session import SessionManager, SessionId
from brad.front_end.watchdog import Watchdog
+from brad.front_end.vdbe.vdbe_endpoint_manager import VdbeEndpointManager
from brad.provisioning.directory import Directory
from brad.row_list import RowList
from brad.utils import log_verbose, create_custom_logger
from brad.utils.rand_exponential_backoff import RandomizedExponentialBackoff
from brad.utils.run_time_reservoir import RunTimeReservoir
from brad.utils.time_periods import universal_now
+from brad.query_rep import QueryRep
from brad.vdbe.manager import VdbeFrontEndManager
from brad.vdbe.models import VirtualInfrastructure
-from brad.front_end.vdbe.vdbe_endpoint_manager import VdbeEndpointManager
logger = logging.getLogger(__name__)
@@ -94,7 +95,8 @@ def __init__(
)
self._daemon_messages_task: Optional[asyncio.Task[None]] = None
- self._reset_latency_sketches()
+ # Stored per VDBE.
+ self._query_latency_sketches: Dict[int, DDSketch] = {}
self._brad_metrics_reporting_task: Optional[asyncio.Task[None]] = None
# Used to re-establish engine connections.
@@ -247,9 +249,17 @@ async def _run_query_impl(
# semicolon if it exists.
# NOTE: BRAD does not yet support having multiple
# semicolon-separated queries in one request.
- query = self._clean_query_str(query)
+ query_rep = QueryRep(self._clean_query_str(query))
+
+ # Verify that the query is not accessing tables that are not part of
+ # the VDBE.
+ for table_name in query_rep.tables():
+ if table_name not in vdbe.table_names_set:
+ raise QueryError(
+ f"Table '{table_name}' not found in VDBE '{vdbe.name}'",
+ is_transient=False,
+ )
- # TODO: Validate table accesses.
engine_to_use = vdbe.mapped_to
log_verbose(
@@ -268,10 +278,10 @@ async def _run_query_impl(
# HACK: To work around dialect differences between
# Athena/Aurora/Redshift for now. This should be replaced by
# a more robust translation layer.
- if engine_to_use == Engine.Athena and "ascii" in query:
- translated_query = query.replace("ascii", "codepoint")
+ if engine_to_use == Engine.Athena and "ascii" in query_rep.raw_query:
+ translated_query = query_rep.raw_query.replace("ascii", "codepoint")
else:
- translated_query = query
+ translated_query = query_rep.raw_query
start = universal_now()
await cursor.execute(translated_query)
end = universal_now()
@@ -299,11 +309,14 @@ async def _run_query_impl(
# Error when executing the query.
raise QueryError.from_exception(ex, is_transient_error)
- # Decide whether to log the query.
+ # Record the run time for later reporting.
run_time_s = end - start
run_time_s_float = run_time_s.total_seconds()
- # TODO: Should be per VDBE.
- self._query_latency_sketch.add(run_time_s_float)
+ try:
+ self._query_latency_sketches[vdbe_id].add(run_time_s_float)
+ except KeyError:
+ self._query_latency_sketches[vdbe_id] = self._get_empty_sketch()
+ self._query_latency_sketches[vdbe_id].add(run_time_s_float)
# Extract and return the results, if any.
try:
@@ -431,17 +444,23 @@ async def _report_metrics_to_daemon(self) -> None:
self._config.front_end_metrics_reporting_period_seconds
)
+ report_data = []
+ for vdbe_id, sketch in self._query_latency_sketches.items():
+ report_data.append((vdbe_id, sketch))
+ query_p90 = sketch.get_quantile_value(0.9)
+ if query_p90 is not None:
+ logger.debug(
+ "VDBE %d Query latency p90 (s): %.4f", vdbe_id, query_p90
+ )
+ logger.info(
+ "Sending VDBE metrics report for %d VDBEs", len(report_data)
+ )
+
# If the input queue is full, we just drop this message.
metrics_report = VdbeMetricsReport.from_data(
- self.NUMERIC_IDENTIFIER,
- [(0, self._query_latency_sketch)],
+ self.NUMERIC_IDENTIFIER, report_data
)
self._output_queue.put_nowait(metrics_report)
-
- query_p90 = self._query_latency_sketch.get_quantile_value(0.9)
- if query_p90 is not None:
- logger.debug("Query latency p90 (s): %.4f", query_p90)
-
self._reset_latency_sketches()
except Exception as ex:
@@ -551,9 +570,11 @@ async def _do_reestablish_connections(self) -> None:
self._reestablish_connections_task = None
def _reset_latency_sketches(self) -> None:
- # TODO: Store per VDBE.
+ self._query_latency_sketches.clear()
+
+ def _get_empty_sketch(self) -> DDSketch:
sketch_rel_accuracy = 0.01
- self._query_latency_sketch = DDSketch(relative_accuracy=sketch_rel_accuracy)
+ return DDSketch(relative_accuracy=sketch_rel_accuracy)
async def _orchestrate_shutdown(fe: BradVdbeFrontEnd) -> None:
diff --git a/src/brad/vdbe/models.py b/src/brad/vdbe/models.py
index a74c7c0b..4ed0bb79 100644
--- a/src/brad/vdbe/models.py
+++ b/src/brad/vdbe/models.py
@@ -1,5 +1,6 @@
import enum
-from typing import List, Optional
+from functools import cached_property
+from typing import List, Optional, Set
from pydantic import BaseModel
from brad.config.engine import Engine
@@ -33,6 +34,10 @@ class VirtualEngine(BaseModel):
mapped_to: Engine
endpoint: Optional[str] = None
+ @cached_property
+ def table_names_set(self) -> Set[str]:
+ return {table.name for table in self.tables}
+
class VirtualInfrastructure(BaseModel):
schema_name: str
diff --git a/ui/src/components/OverallInfraView.jsx b/ui/src/components/OverallInfraView.jsx
index 8deead28..96cd96a1 100644
--- a/ui/src/components/OverallInfraView.jsx
+++ b/ui/src/components/OverallInfraView.jsx
@@ -123,7 +123,7 @@ function OverallInfraView({
diff --git a/ui/src/components/PerfView.jsx b/ui/src/components/PerfView.jsx
index fc6b9b0c..d5025349 100644
--- a/ui/src/components/PerfView.jsx
+++ b/ui/src/components/PerfView.jsx
@@ -62,10 +62,19 @@ function PerfView({ virtualInfra, showingPreview, showVdbeSpecificMetrics }) {
windowSizeMinutes: 10,
metrics: {},
});
-
- if (displayMetricsData.windowSizeMinutes !== windowSizeMinutes) {
- changeDisplayMetricsWindow(windowSizeMinutes);
- }
+ const changeDisplayMetricsWindow = useCallback(
+ (windowSizeMinutes) => {
+ const metricsManager = getMetricsManager();
+ setDisplayMetricsData({
+ windowSizeMinutes,
+ metrics: metricsManager.getMetricsInWindow(
+ windowSizeMinutes,
+ /*extendForward=*/ true,
+ ),
+ });
+ },
+ [getMetricsManager, setDisplayMetricsData],
+ );
const refreshMetrics = useCallback(async () => {
const rawMetrics = await fetchMetrics(60, /*useGenerated=*/ false);
@@ -101,19 +110,9 @@ function PerfView({ virtualInfra, showingPreview, showVdbeSpecificMetrics }) {
};
}, [refreshMetrics]);
- const changeDisplayMetricsWindow = useCallback(
- (windowSizeMinutes) => {
- const metricsManager = getMetricsManager();
- setDisplayMetricsData({
- windowSizeMinutes,
- metrics: metricsManager.getMetricsInWindow(
- windowSizeMinutes,
- /*extendForward=*/ true,
- ),
- });
- },
- [getMetricsManager, setDisplayMetricsData],
- );
+ if (displayMetricsData.windowSizeMinutes !== windowSizeMinutes) {
+ changeDisplayMetricsWindow(windowSizeMinutes);
+ }
const columnStyle = {
flexGrow: 2,
diff --git a/ui/src/components/VdbeView.jsx b/ui/src/components/VdbeView.jsx
index 91b8c77c..351f117b 100644
--- a/ui/src/components/VdbeView.jsx
+++ b/ui/src/components/VdbeView.jsx
@@ -162,7 +162,7 @@ function VdbeView({