Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/brad/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
ShutdownFrontEnd,
Sentinel,
MetricsReport,
VdbeMetricsReport,
InternalCommandRequest,
InternalCommandResponse,
NewBlueprint,
Expand Down Expand Up @@ -117,7 +118,6 @@ def __init__(
self._blueprint_mgr = BlueprintManager(
self._config, self._assets, self._schema_name
)
self._monitor = Monitor(self._config, self._blueprint_mgr)
self._estimator_provider = _EstimatorProvider()
self._providers: Optional[BlueprintProviders] = None
self._planner: Optional[BlueprintPlanner] = None
Expand Down Expand Up @@ -145,6 +145,12 @@ def __init__(
self._vdbe_manager = None
self._vdbe_process: Optional[_VdbeFrontEndProcess] = None

self._monitor = Monitor(
self._config,
self._blueprint_mgr,
create_vdbe_metrics=self._vdbe_manager is not None,
)

# This is used to hold references to internal command tasks we create.
# https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
self._internal_command_tasks: Set[asyncio.Task] = set()
Expand Down Expand Up @@ -459,6 +465,9 @@ async def _read_front_end_messages(self, front_end: "_FrontEndProcess") -> None:
if isinstance(message, MetricsReport):
self._monitor.handle_metric_report(message)

elif isinstance(message, VdbeMetricsReport):
self._monitor.handle_vdbe_metric_report(message)

elif isinstance(message, InternalCommandRequest):
task = asyncio.create_task(
self._run_internal_command_request_response(message)
Expand Down
27 changes: 24 additions & 3 deletions src/brad/daemon/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@

from brad.blueprint.manager import BlueprintManager
from brad.config.file import ConfigFile
from brad.daemon.messages import MetricsReport
from brad.daemon.messages import MetricsReport, VdbeMetricsReport
from brad.daemon.metrics_source import MetricsSourceWithForecasting
from brad.daemon.aurora_metrics import AuroraMetrics
from brad.daemon.front_end_metrics import FrontEndMetrics
from brad.daemon.redshift_metrics import RedshiftMetrics
from brad.daemon.vdbe_metrics import VdbeMetrics

logger = logging.getLogger(__name__)

Expand All @@ -21,6 +22,7 @@ def __init__(
blueprint_mgr: BlueprintManager,
forecasting_method: str = "constant", # {constant, moving_average, linear}
forecasting_window_size: int = 5, # (Up to) how many past samples to base the forecast on
create_vdbe_metrics: bool = False,
) -> None:
self._config = config
self._blueprint_mgr = blueprint_mgr
Expand All @@ -35,6 +37,12 @@ def __init__(
self._front_end_metrics = FrontEndMetrics(
config, forecasting_method, forecasting_window_size
)
if create_vdbe_metrics:
self._vdbe_metrics: Optional[VdbeMetrics] = VdbeMetrics(
config, forecasting_method, forecasting_window_size
)
else:
self._vdbe_metrics = None

def set_up_metrics_sources(self) -> None:
"""
Expand Down Expand Up @@ -120,12 +128,15 @@ async def fetch_latest(self) -> None:
"""
logger.debug("Fetching latest metrics...")
futures = []
for source in chain(
chain_parts = [
[self._aurora_writer_metrics],
self._aurora_reader_metrics,
[self._redshift_metrics],
[self._front_end_metrics],
):
]
if self._vdbe_metrics is not None:
chain_parts.append([self._vdbe_metrics])
for source in chain(*chain_parts): # type: ignore
if source is None:
continue
futures.append(source.fetch_latest())
Expand All @@ -146,6 +157,13 @@ def handle_metric_report(self, report: MetricsReport) -> None:
"""
self._front_end_metrics.handle_metric_report(report)

def handle_vdbe_metric_report(self, report: VdbeMetricsReport) -> None:
"""
Used to pass on VDBE metrics to the underlying metrics source.
"""
if self._vdbe_metrics is not None:
self._vdbe_metrics.handle_metric_report(report)

def _print_key_metrics(self) -> None:
# Used for debug purposes.
if logger.level > logging.DEBUG:
Expand Down Expand Up @@ -190,3 +208,6 @@ def redshift_metrics(self) -> MetricsSourceWithForecasting:
def front_end_metrics(self) -> MetricsSourceWithForecasting:
assert self._front_end_metrics is not None
return self._front_end_metrics

def vdbe_metrics(self) -> Optional[MetricsSourceWithForecasting]:
return self._vdbe_metrics
132 changes: 132 additions & 0 deletions src/brad/daemon/vdbe_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import math
import logging
import pandas as pd
import pytz
import copy
from typing import Dict, List, Optional
from datetime import datetime
from ddsketch import DDSketch

from .metrics_source import MetricsSourceWithForecasting
from brad.config.file import ConfigFile
from brad.daemon.messages import VdbeMetricsReport
from brad.daemon.metrics_logger import MetricsLogger
from brad.utils.streaming_metric import StreamingMetric
from brad.utils import log_verbose
from brad.utils.time_periods import universal_now

logger = logging.getLogger(__name__)


class VdbeMetrics(MetricsSourceWithForecasting):
def __init__(
self,
config: ConfigFile,
forecasting_method: str,
forecasting_window_size: int,
) -> None:
self._config = config
self._epoch_length = self._config.epoch_length
samples_per_epoch = (
self._epoch_length.total_seconds()
/ self._config.front_end_metrics_reporting_period_seconds
)
self._sm_window_size = math.ceil(200 * samples_per_epoch)

# (vdbe_id -> metric)
self._sketch_front_end_metrics: Dict[int, StreamingMetric[DDSketch]] = {}
# All known VDBE IDs.
self._ordered_metrics: List[int] = []
self._values_df = pd.DataFrame([])
self._logger = MetricsLogger.create_from_config(
self._config, "brad_vdbe_metrics_front_end.log"
)

super().__init__(
self._epoch_length, forecasting_method, forecasting_window_size
)

async def fetch_latest(self) -> None:
now = universal_now()
num_epochs = 5
end_time = (
now - (now - datetime.min.replace(tzinfo=pytz.UTC)) % self._epoch_length
)
start_time = end_time - num_epochs * self._epoch_length

timestamps = []
data_cols: Dict[str, List[float]] = {
str(metric_name): [] for metric_name in self._ordered_metrics
}

for offset in range(num_epochs):
window_start = start_time + offset * self._epoch_length
window_end = window_start + self._epoch_length

logger.debug(
"Loading front end metrics for %s -- %s", window_start, window_end
)

for vdbe_id, sketches in self._sketch_front_end_metrics.items():
merged = None
num_matching = 0
min_ts = None
max_ts = None

for sketch, ts in sketches.window_iterator(window_start, window_end):
# These stats are for debug logging.
num_matching += 1
if min_ts is not None:
min_ts = min(min_ts, ts)
else:
min_ts = ts
if max_ts is not None:
max_ts = max(max_ts, ts)
else:
max_ts = ts

if merged is not None:
merged.merge(sketch)
else:
# DDSketch.merge() is an inplace method. We want
# to avoid modifying the stored sketches so we
# make a copy.
merged = copy.deepcopy(sketch)

if merged is None:
logger.warning("Missing latency sketch values for VDBE %d", vdbe_id)
p90_val = 0.0
else:
p90_val_cand = merged.get_quantile_value(0.9)
p90_val = p90_val_cand if p90_val_cand is not None else 0.0

data_cols[str(vdbe_id)].append(p90_val)

timestamps.append(window_end)

new_metrics = pd.DataFrame(data_cols, index=timestamps)
self._values_df = self._get_updated_metrics(new_metrics)
await super().fetch_latest()

def _metrics_values(self) -> pd.DataFrame:
return self._values_df

def _metrics_logger(self) -> Optional[MetricsLogger]:
return self._logger

def handle_metric_report(self, report: VdbeMetricsReport) -> None:
now = universal_now()

for vdbe_id, sketch in report.query_latency_sketches():
if vdbe_id not in self._sketch_front_end_metrics:
self._sketch_front_end_metrics[vdbe_id] = StreamingMetric(
self._sm_window_size
)
self._ordered_metrics.append(vdbe_id)
self._sketch_front_end_metrics[vdbe_id].add_sample(sketch, now)

log_verbose(
logger,
"Received VDBE metrics report: (ts: %s)",
now,
)
30 changes: 30 additions & 0 deletions src/brad/ui/manager_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,46 @@ def get_metrics(num_values: int = 3, use_generated: bool = False) -> MetricsData
tlat = metrics[FrontEndMetric.TxnLatencySecondP90.value]
tlat_tm = TimestampedMetrics(timestamps=list(tlat.index), values=list(tlat))

vdbe_metrics = manager.monitor.vdbe_metrics()
assert vdbe_metrics is not None
vdbe_metrics_values = vdbe_metrics.read_k_most_recent(k=num_values)
vdbes = list(vdbe_metrics_values.columns)
vdbe_latency_dict = {}
for vdbe_id in vdbes:
vdbe_tm = TimestampedMetrics(
timestamps=list(vdbe_metrics_values.index),
values=list(vdbe_metrics_values[vdbe_id]),
)
vdbe_latency_dict[f"vdbe:{vdbe_id}"] = vdbe_tm

if use_generated:
qlat_gen = np.random.normal(loc=15.0, scale=5.0, size=len(qlat))
tlat_gen = np.random.normal(loc=0.015, scale=0.005, size=len(tlat))
qlat_tm.values = list(qlat_gen)
tlat_tm.values = list(tlat_gen)

# When VDBEs are newly created, we have no historical metric data. We fill
# in zeros so that the dashboard can display something other than an empty
# graph. We use the qlat/tlat metrics to determine the timestamps as our
# monitor fills them in with zeros when missing data.
known_vdbes = manager.vdbe_mgr.engines()
if len(vdbe_latency_dict) != len(known_vdbes):
timestamps = list(qlat.index)
zeros = [0.0] * len(timestamps)
for vdbe in known_vdbes:
metric_key = f"vdbe:{vdbe.internal_id}"
if metric_key in vdbe_latency_dict:
continue
vdbe_latency_dict[metric_key] = TimestampedMetrics(
timestamps=timestamps,
values=zeros,
)

return MetricsData(
named_metrics={
FrontEndMetric.QueryLatencySecondP90.value: qlat_tm,
FrontEndMetric.TxnLatencySecondP90.value: tlat_tm,
**vdbe_latency_dict,
}
)

Expand Down
51 changes: 38 additions & 13 deletions ui/src/App.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { useCallback, useState, useEffect } from "react";
import Header from "./components/Header";
import PerfView from "./components/PerfView";
import OverallInfraView from "./components/OverallInfraView";
import SystemConfig from "./components/SystemConfig";
import { fetchSystemState } from "./api";

import "./App.css";
Expand All @@ -26,35 +27,45 @@ function App() {
shownVdbe: null,
},
});
const [config, setConfig] = useState({
showVdbeSpecificMetrics: true,
});
const [showConfigModal, setShowConfigModal] = useState(false);

const refreshData = useCallback(async () => {
// Used for system state refresh.
const refreshSystemState = useCallback(async () => {
const newSystemState = await fetchSystemState();
// Not the best way to check for equality.
if (JSON.stringify(systemState) !== JSON.stringify(newSystemState)) {
setSystemState(newSystemState);
}
}, [systemState, setSystemState]);

// Fetch updated system state periodically.
// Periodically refresh system state.
useEffect(() => {
refreshData();
const intervalId = setInterval(refreshData, REFRESH_INTERVAL_MS);
refreshSystemState();
const intervalId = setInterval(refreshSystemState, REFRESH_INTERVAL_MS);
return () => {
if (intervalId === null) {
return;
}
clearInterval(intervalId);
};
}, [refreshData]);
}, [refreshSystemState]);

// Bind keyboard shortcut for internal config menu.
const handleKeyPress = useCallback((event) => {
if (document.activeElement !== document.body) {
// We only want to handle key presses when no input is focused.
return;
}
// Currently a no-op.
}, []);
const handleKeyPress = useCallback(
(event) => {
if (document.activeElement !== document.body) {
// We only want to handle key presses when no input is focused.
return;
}
if (event.key === "c") {
setShowConfigModal(true);
}
},
[setShowConfigModal],
);

useEffect(() => {
document.addEventListener("keyup", handleKeyPress);
Expand Down Expand Up @@ -104,6 +115,13 @@ function App() {
setAppState({ ...appState, vdbeForm: { open: false, shownVdbe: null } });
};

const changeConfig = useCallback(
(changes) => {
setConfig({ ...config, ...changes });
},
[config, setConfig],
);

return (
<>
<Header
Expand All @@ -119,13 +137,20 @@ function App() {
openVdbeForm={openVdbeForm}
closeVdbeForm={closeVdbeForm}
setPreviewBlueprint={setPreviewBlueprint}
refreshData={refreshData}
refreshData={refreshSystemState}
/>
<PerfView
virtualInfra={systemState.virtual_infra}
showingPreview={previewForm.shownPreviewBlueprint != null}
showVdbeSpecificMetrics={config.showVdbeSpecificMetrics}
/>
</div>
<SystemConfig
open={showConfigModal}
onCloseClick={() => setShowConfigModal(false)}
config={config}
onConfigChange={changeConfig}
/>
</>
);
}
Expand Down
Loading