Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/brad/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,9 +465,6 @@ async def _read_front_end_messages(self, front_end: "_FrontEndProcess") -> None:
if isinstance(message, MetricsReport):
self._monitor.handle_metric_report(message)

elif isinstance(message, VdbeMetricsReport):
self._monitor.handle_vdbe_metric_report(message)

elif isinstance(message, InternalCommandRequest):
task = asyncio.create_task(
self._run_internal_command_request_response(message)
Expand Down Expand Up @@ -575,6 +572,9 @@ async def _read_vdbe_messages(self, vdbe_process: "_VdbeFrontEndProcess") -> Non
)
vdbe_process.mailbox.on_new_message((None,))

elif isinstance(message, VdbeMetricsReport):
self._monitor.handle_vdbe_metric_report(message)

else:
logger.debug(
"Received unexpected message from front end %d: %s",
Expand Down
4 changes: 4 additions & 0 deletions src/brad/daemon/vdbe_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ def _metrics_logger(self) -> Optional[MetricsLogger]:

def handle_metric_report(self, report: VdbeMetricsReport) -> None:
now = universal_now()
logger.debug("Handling VDBE metrics report: (ts: %s)", now)
for vdbe_id, sketch in report.query_latency_sketches():
p90 = sketch.get_quantile_value(0.9)
logger.debug("Has sketch for VDBE %d. p90: %f", vdbe_id, p90)

for vdbe_id, sketch in report.query_latency_sketches():
if vdbe_id not in self._sketch_front_end_metrics:
Expand Down
59 changes: 40 additions & 19 deletions src/brad/front_end/vdbe/vdbe_front_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@
from brad.front_end.errors import QueryError
from brad.front_end.session import SessionManager, SessionId
from brad.front_end.watchdog import Watchdog
from brad.front_end.vdbe.vdbe_endpoint_manager import VdbeEndpointManager
from brad.provisioning.directory import Directory
from brad.row_list import RowList
from brad.utils import log_verbose, create_custom_logger
from brad.utils.rand_exponential_backoff import RandomizedExponentialBackoff
from brad.utils.run_time_reservoir import RunTimeReservoir
from brad.utils.time_periods import universal_now
from brad.query_rep import QueryRep
from brad.vdbe.manager import VdbeFrontEndManager
from brad.vdbe.models import VirtualInfrastructure
from brad.front_end.vdbe.vdbe_endpoint_manager import VdbeEndpointManager

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -94,7 +95,8 @@ def __init__(
)
self._daemon_messages_task: Optional[asyncio.Task[None]] = None

self._reset_latency_sketches()
# Stored per VDBE.
self._query_latency_sketches: Dict[int, DDSketch] = {}
self._brad_metrics_reporting_task: Optional[asyncio.Task[None]] = None

# Used to re-establish engine connections.
Expand Down Expand Up @@ -247,9 +249,17 @@ async def _run_query_impl(
# semicolon if it exists.
# NOTE: BRAD does not yet support having multiple
# semicolon-separated queries in one request.
query = self._clean_query_str(query)
query_rep = QueryRep(self._clean_query_str(query))

# Verify that the query is not accessing tables that are not part of
# the VDBE.
for table_name in query_rep.tables():
if table_name not in vdbe.table_names_set:
raise QueryError(
f"Table '{table_name}' not found in VDBE '{vdbe.name}'",
is_transient=False,
)

# TODO: Validate table accesses.
engine_to_use = vdbe.mapped_to

log_verbose(
Expand All @@ -268,10 +278,10 @@ async def _run_query_impl(
# HACK: To work around dialect differences between
# Athena/Aurora/Redshift for now. This should be replaced by
# a more robust translation layer.
if engine_to_use == Engine.Athena and "ascii" in query:
translated_query = query.replace("ascii", "codepoint")
if engine_to_use == Engine.Athena and "ascii" in query_rep.raw_query:
translated_query = query_rep.raw_query.replace("ascii", "codepoint")
else:
translated_query = query
translated_query = query_rep.raw_query
start = universal_now()
await cursor.execute(translated_query)
end = universal_now()
Expand Down Expand Up @@ -299,11 +309,14 @@ async def _run_query_impl(
# Error when executing the query.
raise QueryError.from_exception(ex, is_transient_error)

# Decide whether to log the query.
# Record the run time for later reporting.
run_time_s = end - start
run_time_s_float = run_time_s.total_seconds()
# TODO: Should be per VDBE.
self._query_latency_sketch.add(run_time_s_float)
try:
self._query_latency_sketches[vdbe_id].add(run_time_s_float)
except KeyError:
self._query_latency_sketches[vdbe_id] = self._get_empty_sketch()
self._query_latency_sketches[vdbe_id].add(run_time_s_float)

# Extract and return the results, if any.
try:
Expand Down Expand Up @@ -431,17 +444,23 @@ async def _report_metrics_to_daemon(self) -> None:
self._config.front_end_metrics_reporting_period_seconds
)

report_data = []
for vdbe_id, sketch in self._query_latency_sketches.items():
report_data.append((vdbe_id, sketch))
query_p90 = sketch.get_quantile_value(0.9)
if query_p90 is not None:
logger.debug(
"VDBE %d Query latency p90 (s): %.4f", vdbe_id, query_p90
)
logger.info(
"Sending VDBE metrics report for %d VDBEs", len(report_data)
)

# If the input queue is full, we just drop this message.
metrics_report = VdbeMetricsReport.from_data(
self.NUMERIC_IDENTIFIER,
[(0, self._query_latency_sketch)],
self.NUMERIC_IDENTIFIER, report_data
)
self._output_queue.put_nowait(metrics_report)

query_p90 = self._query_latency_sketch.get_quantile_value(0.9)
if query_p90 is not None:
logger.debug("Query latency p90 (s): %.4f", query_p90)

self._reset_latency_sketches()

except Exception as ex:
Expand Down Expand Up @@ -551,9 +570,11 @@ async def _do_reestablish_connections(self) -> None:
self._reestablish_connections_task = None

def _reset_latency_sketches(self) -> None:
# TODO: Store per VDBE.
self._query_latency_sketches.clear()

def _get_empty_sketch(self) -> DDSketch:
sketch_rel_accuracy = 0.01
self._query_latency_sketch = DDSketch(relative_accuracy=sketch_rel_accuracy)
return DDSketch(relative_accuracy=sketch_rel_accuracy)


async def _orchestrate_shutdown(fe: BradVdbeFrontEnd) -> None:
Expand Down
7 changes: 6 additions & 1 deletion src/brad/vdbe/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import enum
from typing import List, Optional
from functools import cached_property
from typing import List, Optional, Set
from pydantic import BaseModel

from brad.config.engine import Engine
Expand Down Expand Up @@ -33,6 +34,10 @@ class VirtualEngine(BaseModel):
mapped_to: Engine
endpoint: Optional[str] = None

@cached_property
def table_names_set(self) -> Set[str]:
return {table.name for table in self.tables}


class VirtualInfrastructure(BaseModel):
schema_name: str
Expand Down
2 changes: 1 addition & 1 deletion ui/src/components/OverallInfraView.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ function OverallInfraView({
</ConfirmDialog>
<Snackbar
open={showVdbeChangeSuccess}
autoHideDuration={3000}
autoHideDuration={2000}
message="VDBE changes successfully saved."
onClose={handleSnackbarClose}
/>
Expand Down
33 changes: 16 additions & 17 deletions ui/src/components/PerfView.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,19 @@ function PerfView({ virtualInfra, showingPreview, showVdbeSpecificMetrics }) {
windowSizeMinutes: 10,
metrics: {},
});

if (displayMetricsData.windowSizeMinutes !== windowSizeMinutes) {
changeDisplayMetricsWindow(windowSizeMinutes);
}
const changeDisplayMetricsWindow = useCallback(
(windowSizeMinutes) => {
const metricsManager = getMetricsManager();
setDisplayMetricsData({
windowSizeMinutes,
metrics: metricsManager.getMetricsInWindow(
windowSizeMinutes,
/*extendForward=*/ true,
),
});
},
[getMetricsManager, setDisplayMetricsData],
);

const refreshMetrics = useCallback(async () => {
const rawMetrics = await fetchMetrics(60, /*useGenerated=*/ false);
Expand Down Expand Up @@ -101,19 +110,9 @@ function PerfView({ virtualInfra, showingPreview, showVdbeSpecificMetrics }) {
};
}, [refreshMetrics]);

const changeDisplayMetricsWindow = useCallback(
(windowSizeMinutes) => {
const metricsManager = getMetricsManager();
setDisplayMetricsData({
windowSizeMinutes,
metrics: metricsManager.getMetricsInWindow(
windowSizeMinutes,
/*extendForward=*/ true,
),
});
},
[getMetricsManager, setDisplayMetricsData],
);
if (displayMetricsData.windowSizeMinutes !== windowSizeMinutes) {
changeDisplayMetricsWindow(windowSizeMinutes);
}

const columnStyle = {
flexGrow: 2,
Expand Down
2 changes: 1 addition & 1 deletion ui/src/components/VdbeView.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ function VdbeView({
</div>
<Snackbar
open={showSnackbar}
autoHideDuration={3000}
autoHideDuration={2000}
message="Endpoint copied to clipboard"
onClose={handleClose}
/>
Expand Down