From 759652a5c43ed286c4903214b9b3850a7b298bb8 Mon Sep 17 00:00:00 2001 From: Vladimir Smirnov Date: Fri, 29 Aug 2014 15:15:41 +0200 Subject: [PATCH] Add ability to fetch all data with a single query. - Backport fetch_multi from graphite-api (done there by brutasse) - Fix it's behaviour so data for multiple targets will call fetch_multi, if it's available. --- webapp/graphite/render/datalib.py | 102 +++++++++++++++++++++++++++- webapp/graphite/render/evaluator.py | 87 +++++++++++++++++++++++- webapp/graphite/render/functions.py | 52 ++++++++------ webapp/graphite/render/views.py | 23 ++++--- 4 files changed, 233 insertions(+), 31 deletions(-) diff --git a/webapp/graphite/render/datalib.py b/webapp/graphite/render/datalib.py index 590bdde23..ec60aaff7 100644 --- a/webapp/graphite/render/datalib.py +++ b/webapp/graphite/render/datalib.py @@ -18,6 +18,7 @@ from graphite.storage import STORE from graphite.readers import FetchInProgress from django.conf import settings +from collections import defaultdict class TimeSeries(list): def __init__(self, name, start, end, step, values, consolidate='average'): @@ -89,6 +90,80 @@ def getInfo(self): # Data retrieval API +def fetchDataMulti(requestContext, paths): + + seriesList = [] + startTime = int( time.mktime( requestContext['startTime'].timetuple() ) ) + endTime = int( time.mktime( requestContext['endTime'].timetuple() ) ) + + def _fetchDataMulti(pathExpr, startTime, endTime, requestContext, seriesList): + multi_nodes = defaultdict(list) + single_nodes = [] + for path in pathExpr: + matching_nodes = STORE.find(path, startTime, endTime) + for node in matching_nodes: + if hasattr(node, '__fetch_multi__'): + multi_nodes[node.__fetch_multi__].append(node) + else: + single_nodes.append(node) + + fetches = [ + (node, node.fetch(startTime, endTime)) for node in single_nodes] + + for finder in STORE.finders: + if not hasattr(finder, '__fetch_multi__'): + continue + nodes = multi_nodes[finder.__fetch_multi__] + if not nodes: + continue + time_info, series = finder.fetch_multi(nodes, startTime, endTime) + start, end, step = time_info + for path, values in series.items(): + series = TimeSeries(path, start, end, step, values) + series.pathExpression = pathExpr + seriesList.append(series) + + for node, results in fetches: + if not results: + logger.info("no results", node=node, start=startTime, + end=endTime) + continue + + try: + timeInfo, values = results + except ValueError as e: + raise Exception("could not parse timeInfo/values from metric " + "'%s': %s" % (node.path, e)) + start, end, step = timeInfo + + series = TimeSeries(node.path, start, end, step, values) + # hack to pass expressions through to render functions + series.pathExpression = pathExpr + seriesList.append(series) + + # Prune empty series with duplicate metric paths to avoid showing + # empty graph elements for old whisper data + names = set([s.name for s in seriesList]) + for name in names: + series_with_duplicate_names = [ + s for s in seriesList if s.name == name] + empty_duplicates = [ + s for s in series_with_duplicate_names + if not nonempty(series)] + + if ( + series_with_duplicate_names == empty_duplicates and + len(empty_duplicates) > 0 + ): # if they're all empty + empty_duplicates.pop() # make sure we leave one in seriesList + + for series in empty_duplicates: + seriesList.remove(series) + + return seriesList + + return _fetchDataMulti(paths, startTime, endTime, requestContext, seriesList) + def fetchData(requestContext, pathExpr): seriesList = [] @@ -96,7 +171,32 @@ def fetchData(requestContext, pathExpr): endTime = int( time.mktime( requestContext['endTime'].timetuple() ) ) def _fetchData(pathExpr,startTime, endTime, requestContext, seriesList): matching_nodes = STORE.find(pathExpr, startTime, endTime, local=requestContext['localOnly']) - fetches = [(node, node.fetch(startTime, endTime)) for node in matching_nodes if node.is_leaf] + + # Group nodes that support multiple fetches + multi_nodes = defaultdict(list) + single_nodes = [] + for node in matching_nodes: + if not node.is_leaf: + continue + if hasattr(node, '__fetch_multi__'): + multi_nodes[node.__fetch_multi__].append(node) + else: + single_nodes.append(node) + + fetches = [(node, node.fetch(startTime, endTime)) for node in single_nodes] + + for finder in STORE.finders: + if not hasattr(finder, '__fetch_multi__'): + continue + nodes = multi_nodes[finder.__fetch_multi__] + if not nodes: + continue + time_info, series = finder.fetch_multi(nodes, startTime, endTime) + start, end, step = time_info + for path, values in series.items(): + series = TimeSeries(path, start, end, step, values) + series.pathExpression = pathExpr + seriesList.append(series) for node, results in fetches: if isinstance(results, FetchInProgress): diff --git a/webapp/graphite/render/evaluator.py b/webapp/graphite/render/evaluator.py index eabea5cff..981d91714 100644 --- a/webapp/graphite/render/evaluator.py +++ b/webapp/graphite/render/evaluator.py @@ -3,7 +3,92 @@ from django.conf import settings from graphite.render.grammar import grammar from graphite.logger import log -from graphite.render.datalib import fetchData, TimeSeries +from graphite.render.datalib import fetchData, fetchDataMulti, TimeSeries + + +def evaluateTargets(requestContext, targets): + tokens = [] + for target in targets: + tokens.append(grammar.parseString(target)) + + result = evaluateTokensMulti(requestContext, tokens) + + if isinstance(result, TimeSeries): + return [result] # we have to return a list of TimeSeries objects + + return result + + +def findPaths(requestContext, tokens, paths): + if tokens.expression: + findPaths(requestContext, tokens.expression, paths) + elif tokens.call: + for arg in tokens.call.args: + findPaths(requestContext, arg, paths) + for kwarg in tokens.call.kwargs: + findPaths(requestContext, kwarg.args[0], paths) + elif tokens.pathExpression: + paths.add(tokens.pathExpression) + + +def evaluateTokensMulti(requestContext, tokensList): + fetch = set() + result = [ ] + + for tokens in tokensList: + findPaths(requestContext, tokens, fetch) + + timeSeriesList = fetchDataMulti(requestContext, list(fetch)) + series = [] + for tokens in tokensList: + serie = evaluateTokensWithTimeSeries(requestContext, tokens, timeSeriesList) + if isinstance(serie, TimeSeries): + series.append(serie) + else: + series.extend(serie) + + return series + + +def evaluateTokensWithTimeSeries(requestContext, tokens, timeSeriesList): + if tokens.expression: + return evaluateTokensWithTimeSeries(requestContext, tokens.expression, timeSeriesList) + + elif tokens.pathExpression: + for ts in timeSeriesList: + print('%s == %s is %s' % (ts.name, tokens.pathExpression, ts.name == tokens.pathExpression)) + if ts.name == tokens.pathExpression: + return [ts] + fetch = fetchDataMulti(requestContext, [tokens.pathExpression]) + if isinstance(fetch, list): + return fetch + return [fetch] + + elif tokens.call: + func = SeriesFunctions[tokens.call.func] + args = [evaluateTokensWithTimeSeries(requestContext, + arg, + timeSeriesList) for arg in tokens.call.args] + kwargs = dict([(kwarg.argname, + evaluateTokensWithTimeSeries(requestContext, + kwarg.args[0], + timeSeriesList)) + for kwarg in tokens.call.kwargs]) + return func(requestContext, *args, **kwargs) + + elif tokens.number: + if tokens.number.integer: + return int(tokens.number.integer) + elif tokens.number.float: + return float(tokens.number.float) + elif tokens.number.scientific: + return float(tokens.number.scientific[0]) + + elif tokens.string: + return tokens.string[1:-1] + + elif tokens.boolean: + return tokens.boolean[0] == 'true' def evaluateTarget(requestContext, target): tokens = grammar.parseString(target) diff --git a/webapp/graphite/render/functions.py b/webapp/graphite/render/functions.py index 2b57e4f41..6a733e490 100644 --- a/webapp/graphite/render/functions.py +++ b/webapp/graphite/render/functions.py @@ -137,14 +137,22 @@ def normalize(seriesLists): def formatPathExpressions(seriesList): # remove duplicates - pathExpressions = [] - [pathExpressions.append(s.pathExpression) for s in seriesList if not pathExpressions.count(s.pathExpression)] - return ','.join(pathExpressions) + pathExpressions = set() + for s in seriesList: + if isinstance(s.pathExpression, list): + pathExpressions.add(s.pathExpression[0]) + else: + pathExpressions.add(s.pathExpression) + return ','.join(sorted(pathExpressions)) def formatPathExpressions_diff_special(seriesList): # remove duplicates pathExpressions = [] - [pathExpressions.append(s.name) for s in seriesList] + for s in seriesList: + if isinstance(s.pathExpression, list): + pathExpressions.append(s.pathExpression[0]) + else: + pathExpressions.append(s.pathExpression) return ','.join(pathExpressions) @@ -1888,7 +1896,7 @@ def useSeriesAbove(requestContext, seriesList, value, search, replace): for series in seriesList: newname = re.sub(search, replace, series.name) if max(series) > value: - n = evaluateTarget(requestContext, newname) + n = evaluateTargets(requestContext, [newname]) if n is not None and len(n) > 0: newSeries.append(n[0]) @@ -2005,12 +2013,14 @@ def _fetchWithBootstrap(requestContext, seriesList, **delta_kwargs): bootstrapContext['endTime'] = requestContext['startTime'] bootstrapList = [] + bootstrapPathExpressions = [] for series in seriesList: if series.pathExpression in [ b.pathExpression for b in bootstrapList ]: # This pathExpression returns multiple series and we already fetched it continue - bootstraps = evaluateTarget(bootstrapContext, series.pathExpression) - bootstrapList.extend(bootstraps) + bootstrapPathExpressions.append(series.pathExpression) + bootstraps = evaluateTargets(bootstrapContext, series.pathExpression) + bootstrapList.extend(bootstraps) newSeriesList = [] for bootstrap, original in zip(bootstrapList, seriesList): @@ -2336,7 +2346,7 @@ def timeStack(requestContext, seriesList, timeShiftUnit, timeShiftStart, timeShi innerDelta = delta * shft myContext['startTime'] = requestContext['startTime'] + innerDelta myContext['endTime'] = requestContext['endTime'] + innerDelta - for shiftedSeries in evaluateTarget(myContext, series.pathExpression): + for shiftedSeries in evaluateTargets(myContext, [series.pathExpression]): shiftedSeries.name = 'timeShift(%s, %s, %s)' % (shiftedSeries.name, timeShiftUnit,shft) shiftedSeries.pathExpression = shiftedSeries.name shiftedSeries.start = series.start @@ -2383,7 +2393,7 @@ def timeShift(requestContext, seriesList, timeShift, resetEnd=True): if len(seriesList) > 0: series = seriesList[0] # if len(seriesList) > 1, they will all have the same pathExpression, which is all we care about. - for shiftedSeries in evaluateTarget(myContext, series.pathExpression): + for shiftedSeries in evaluateTargets(myContext, [series.pathExpression]): shiftedSeries.name = 'timeShift(%s, "%s")' % (shiftedSeries.name, timeShift) if resetEnd: shiftedSeries.end = series.end @@ -2727,14 +2737,9 @@ def smartSummarize(requestContext, seriesList, intervalString, func='sum', align elif interval >= MINUTE: requestContext['startTime'] = datetime(s.year, s.month, s.day, s.hour, s.minute) - for i,series in enumerate(seriesList): - # XXX: breaks with summarize(metric.{a,b}) - # each series.pathExpression == metric.{a,b} - newSeries = evaluateTarget(requestContext, series.pathExpression)[0] - series[0:len(series)] = newSeries - series.start = newSeries.start - series.end = newSeries.end - series.step = newSeries.step + # XXX: breaks with summarize(metric.{a,b}) + pathExpressions = [series.pathExpression for series in seriesList] + seriesList = evaluateTargets(requestContext, pathExpressions) for series in seriesList: buckets = {} # { timestamp: [values] } @@ -2900,8 +2905,15 @@ def hitcount(requestContext, seriesList, intervalString, alignToInterval = False elif interval >= MINUTE: requestContext['startTime'] = datetime(s.year, s.month, s.day, s.hour, s.minute) - for i,series in enumerate(seriesList): - newSeries = evaluateTarget(requestContext, series.pathExpression)[0] + pathExpressionList = [series.pathExpression for series in seriesList] + newSeriesList = evaluateTargets(requestContext, pathExpressionList) + newSeries = None + for i, series in enumerate(seriesList): + for i in range(len(newSeriesList)): + if newSeriesList[i].pathExpression[0] == series.pathExpression: + newSeries = newSeriesList[i] + del newSeriesList[i] + break intervalCount = int((series.end - series.start) / interval) series[0:len(series)] = newSeries series.start = newSeries.start @@ -3298,4 +3310,4 @@ def pieMinimum(requestContext, series): #Avoid import circularity if not environ.get('READTHEDOCS'): - from graphite.render.evaluator import evaluateTarget + from graphite.render.evaluator import evaluateTarget, evaluateTargets diff --git a/webapp/graphite/render/views.py b/webapp/graphite/render/views.py index c25a3c3de..fe08f5c3d 100644 --- a/webapp/graphite/render/views.py +++ b/webapp/graphite/render/views.py @@ -37,7 +37,7 @@ from graphite.compat import HttpResponse from graphite.remote_storage import HTTPConnectionWithTimeout from graphite.logger import log -from graphite.render.evaluator import evaluateTarget +from graphite.render.evaluator import evaluateTarget, evaluateTargets from graphite.render.attime import parseATTime from graphite.render.functions import PieFunctions from graphite.render.hashing import hashRequest, hashData @@ -83,6 +83,7 @@ def renderView(request): log.cache('Request-Cache miss [%s]' % requestKey) # Now we prepare the requested data + targetsList = [] if requestOptions['graphType'] == 'pie': for target in requestOptions['targets']: if target.find(':') >= 0: @@ -93,11 +94,13 @@ def renderView(request): raise ValueError("Invalid target '%s'" % target) data.append( (name,value) ) else: - seriesList = evaluateTarget(requestContext, target) + targetsList.append(target) - for series in seriesList: - func = PieFunctions[requestOptions['pieMode']] - data.append( (series.name, func(requestContext, series) or 0 )) + seriesList = evaluateTargets(requestContext, targetsList) + + for series in seriesList: + func = PieFunctions[requestOptions['pieMode']] + data.append( (series.name, func(requestContext, series) or 0 )) elif requestOptions['graphType'] == 'line': # Let's see if at least our data is cached @@ -117,13 +120,15 @@ def renderView(request): if cachedData is not None: requestContext['data'] = data = cachedData else: # Have to actually retrieve the data now + t = time() for target in requestOptions['targets']: if not target.strip(): continue - t = time() - seriesList = evaluateTarget(requestContext, target) - log.rendering("Retrieval of %s took %.6f" % (target, time() - t)) - data.extend(seriesList) + targetsList.append(target) + + seriesList = evaluateTargets(requestContext, targetsList) + log.rendering("Retrieval of '%s' took %.6f" % (targetsList, time() - t)) + data.extend(seriesList) if useCache: cache.add(dataKey, data, cacheTimeout)