Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 101 additions & 1 deletion webapp/graphite/render/datalib.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from graphite.storage import STORE
from graphite.readers import FetchInProgress
from django.conf import settings
from collections import defaultdict

class TimeSeries(list):
def __init__(self, name, start, end, step, values, consolidate='average'):
Expand Down Expand Up @@ -89,14 +90,113 @@ def getInfo(self):


# Data retrieval API
def fetchDataMulti(requestContext, paths):

seriesList = []
startTime = int( time.mktime( requestContext['startTime'].timetuple() ) )
endTime = int( time.mktime( requestContext['endTime'].timetuple() ) )

def _fetchDataMulti(pathExpr, startTime, endTime, requestContext, seriesList):
multi_nodes = defaultdict(list)
single_nodes = []
for path in pathExpr:
matching_nodes = STORE.find(path, startTime, endTime)
for node in matching_nodes:
if hasattr(node, '__fetch_multi__'):
multi_nodes[node.__fetch_multi__].append(node)
else:
single_nodes.append(node)

fetches = [
(node, node.fetch(startTime, endTime)) for node in single_nodes]

for finder in STORE.finders:
if not hasattr(finder, '__fetch_multi__'):
continue
nodes = multi_nodes[finder.__fetch_multi__]
if not nodes:
continue
time_info, series = finder.fetch_multi(nodes, startTime, endTime)
start, end, step = time_info
for path, values in series.items():
series = TimeSeries(path, start, end, step, values)
series.pathExpression = pathExpr
seriesList.append(series)

for node, results in fetches:
if not results:
logger.info("no results", node=node, start=startTime,
end=endTime)
continue

try:
timeInfo, values = results
except ValueError as e:
raise Exception("could not parse timeInfo/values from metric "
"'%s': %s" % (node.path, e))
start, end, step = timeInfo

series = TimeSeries(node.path, start, end, step, values)
# hack to pass expressions through to render functions
series.pathExpression = pathExpr
seriesList.append(series)

# Prune empty series with duplicate metric paths to avoid showing
# empty graph elements for old whisper data
names = set([s.name for s in seriesList])
for name in names:
series_with_duplicate_names = [
s for s in seriesList if s.name == name]
empty_duplicates = [
s for s in series_with_duplicate_names
if not nonempty(series)]

if (
series_with_duplicate_names == empty_duplicates and
len(empty_duplicates) > 0
): # if they're all empty
empty_duplicates.pop() # make sure we leave one in seriesList

for series in empty_duplicates:
seriesList.remove(series)

return seriesList

return _fetchDataMulti(paths, startTime, endTime, requestContext, seriesList)

def fetchData(requestContext, pathExpr):

seriesList = []
startTime = int( time.mktime( requestContext['startTime'].timetuple() ) )
endTime = int( time.mktime( requestContext['endTime'].timetuple() ) )
def _fetchData(pathExpr,startTime, endTime, requestContext, seriesList):
matching_nodes = STORE.find(pathExpr, startTime, endTime, local=requestContext['localOnly'])
fetches = [(node, node.fetch(startTime, endTime)) for node in matching_nodes if node.is_leaf]

# Group nodes that support multiple fetches
multi_nodes = defaultdict(list)
single_nodes = []
for node in matching_nodes:
if not node.is_leaf:
continue
if hasattr(node, '__fetch_multi__'):
multi_nodes[node.__fetch_multi__].append(node)
else:
single_nodes.append(node)

fetches = [(node, node.fetch(startTime, endTime)) for node in single_nodes]

for finder in STORE.finders:
if not hasattr(finder, '__fetch_multi__'):
continue
nodes = multi_nodes[finder.__fetch_multi__]
if not nodes:
continue
time_info, series = finder.fetch_multi(nodes, startTime, endTime)
start, end, step = time_info
for path, values in series.items():
series = TimeSeries(path, start, end, step, values)
series.pathExpression = pathExpr
seriesList.append(series)

for node, results in fetches:
if isinstance(results, FetchInProgress):
Expand Down
87 changes: 86 additions & 1 deletion webapp/graphite/render/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,92 @@
from django.conf import settings
from graphite.render.grammar import grammar
from graphite.logger import log
from graphite.render.datalib import fetchData, TimeSeries
from graphite.render.datalib import fetchData, fetchDataMulti, TimeSeries


def evaluateTargets(requestContext, targets):
tokens = []
for target in targets:
tokens.append(grammar.parseString(target))

result = evaluateTokensMulti(requestContext, tokens)

if isinstance(result, TimeSeries):
return [result] # we have to return a list of TimeSeries objects

return result


def findPaths(requestContext, tokens, paths):
if tokens.expression:
findPaths(requestContext, tokens.expression, paths)
elif tokens.call:
for arg in tokens.call.args:
findPaths(requestContext, arg, paths)
for kwarg in tokens.call.kwargs:
findPaths(requestContext, kwarg.args[0], paths)
elif tokens.pathExpression:
paths.add(tokens.pathExpression)


def evaluateTokensMulti(requestContext, tokensList):
fetch = set()
result = [ ]

for tokens in tokensList:
findPaths(requestContext, tokens, fetch)

timeSeriesList = fetchDataMulti(requestContext, list(fetch))
series = []
for tokens in tokensList:
serie = evaluateTokensWithTimeSeries(requestContext, tokens, timeSeriesList)
if isinstance(serie, TimeSeries):
series.append(serie)
else:
series.extend(serie)

return series


def evaluateTokensWithTimeSeries(requestContext, tokens, timeSeriesList):
if tokens.expression:
return evaluateTokensWithTimeSeries(requestContext, tokens.expression, timeSeriesList)

elif tokens.pathExpression:
for ts in timeSeriesList:
print('%s == %s is %s' % (ts.name, tokens.pathExpression, ts.name == tokens.pathExpression))
if ts.name == tokens.pathExpression:
return [ts]
fetch = fetchDataMulti(requestContext, [tokens.pathExpression])
if isinstance(fetch, list):
return fetch
return [fetch]

elif tokens.call:
func = SeriesFunctions[tokens.call.func]
args = [evaluateTokensWithTimeSeries(requestContext,
arg,
timeSeriesList) for arg in tokens.call.args]
kwargs = dict([(kwarg.argname,
evaluateTokensWithTimeSeries(requestContext,
kwarg.args[0],
timeSeriesList))
for kwarg in tokens.call.kwargs])
return func(requestContext, *args, **kwargs)

elif tokens.number:
if tokens.number.integer:
return int(tokens.number.integer)
elif tokens.number.float:
return float(tokens.number.float)
elif tokens.number.scientific:
return float(tokens.number.scientific[0])

elif tokens.string:
return tokens.string[1:-1]

elif tokens.boolean:
return tokens.boolean[0] == 'true'

def evaluateTarget(requestContext, target):
tokens = grammar.parseString(target)
Expand Down
52 changes: 32 additions & 20 deletions webapp/graphite/render/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,22 @@ def normalize(seriesLists):

def formatPathExpressions(seriesList):
# remove duplicates
pathExpressions = []
[pathExpressions.append(s.pathExpression) for s in seriesList if not pathExpressions.count(s.pathExpression)]
return ','.join(pathExpressions)
pathExpressions = set()
for s in seriesList:
if isinstance(s.pathExpression, list):
pathExpressions.add(s.pathExpression[0])
else:
pathExpressions.add(s.pathExpression)
return ','.join(sorted(pathExpressions))

def formatPathExpressions_diff_special(seriesList):
# remove duplicates
pathExpressions = []
[pathExpressions.append(s.name) for s in seriesList]
for s in seriesList:
if isinstance(s.pathExpression, list):
pathExpressions.append(s.pathExpression[0])
else:
pathExpressions.append(s.pathExpression)
return ','.join(pathExpressions)


Expand Down Expand Up @@ -1888,7 +1896,7 @@ def useSeriesAbove(requestContext, seriesList, value, search, replace):
for series in seriesList:
newname = re.sub(search, replace, series.name)
if max(series) > value:
n = evaluateTarget(requestContext, newname)
n = evaluateTargets(requestContext, [newname])
if n is not None and len(n) > 0:
newSeries.append(n[0])

Expand Down Expand Up @@ -2005,12 +2013,14 @@ def _fetchWithBootstrap(requestContext, seriesList, **delta_kwargs):
bootstrapContext['endTime'] = requestContext['startTime']

bootstrapList = []
bootstrapPathExpressions = []
for series in seriesList:
if series.pathExpression in [ b.pathExpression for b in bootstrapList ]:
# This pathExpression returns multiple series and we already fetched it
continue
bootstraps = evaluateTarget(bootstrapContext, series.pathExpression)
bootstrapList.extend(bootstraps)
bootstrapPathExpressions.append(series.pathExpression)
bootstraps = evaluateTargets(bootstrapContext, series.pathExpression)
bootstrapList.extend(bootstraps)

newSeriesList = []
for bootstrap, original in zip(bootstrapList, seriesList):
Expand Down Expand Up @@ -2336,7 +2346,7 @@ def timeStack(requestContext, seriesList, timeShiftUnit, timeShiftStart, timeShi
innerDelta = delta * shft
myContext['startTime'] = requestContext['startTime'] + innerDelta
myContext['endTime'] = requestContext['endTime'] + innerDelta
for shiftedSeries in evaluateTarget(myContext, series.pathExpression):
for shiftedSeries in evaluateTargets(myContext, [series.pathExpression]):
shiftedSeries.name = 'timeShift(%s, %s, %s)' % (shiftedSeries.name, timeShiftUnit,shft)
shiftedSeries.pathExpression = shiftedSeries.name
shiftedSeries.start = series.start
Expand Down Expand Up @@ -2383,7 +2393,7 @@ def timeShift(requestContext, seriesList, timeShift, resetEnd=True):
if len(seriesList) > 0:
series = seriesList[0] # if len(seriesList) > 1, they will all have the same pathExpression, which is all we care about.

for shiftedSeries in evaluateTarget(myContext, series.pathExpression):
for shiftedSeries in evaluateTargets(myContext, [series.pathExpression]):
shiftedSeries.name = 'timeShift(%s, "%s")' % (shiftedSeries.name, timeShift)
if resetEnd:
shiftedSeries.end = series.end
Expand Down Expand Up @@ -2727,14 +2737,9 @@ def smartSummarize(requestContext, seriesList, intervalString, func='sum', align
elif interval >= MINUTE:
requestContext['startTime'] = datetime(s.year, s.month, s.day, s.hour, s.minute)

for i,series in enumerate(seriesList):
# XXX: breaks with summarize(metric.{a,b})
# each series.pathExpression == metric.{a,b}
newSeries = evaluateTarget(requestContext, series.pathExpression)[0]
series[0:len(series)] = newSeries
series.start = newSeries.start
series.end = newSeries.end
series.step = newSeries.step
# XXX: breaks with summarize(metric.{a,b})
pathExpressions = [series.pathExpression for series in seriesList]
seriesList = evaluateTargets(requestContext, pathExpressions)

for series in seriesList:
buckets = {} # { timestamp: [values] }
Expand Down Expand Up @@ -2900,8 +2905,15 @@ def hitcount(requestContext, seriesList, intervalString, alignToInterval = False
elif interval >= MINUTE:
requestContext['startTime'] = datetime(s.year, s.month, s.day, s.hour, s.minute)

for i,series in enumerate(seriesList):
newSeries = evaluateTarget(requestContext, series.pathExpression)[0]
pathExpressionList = [series.pathExpression for series in seriesList]
newSeriesList = evaluateTargets(requestContext, pathExpressionList)
newSeries = None
for i, series in enumerate(seriesList):
for i in range(len(newSeriesList)):
if newSeriesList[i].pathExpression[0] == series.pathExpression:
newSeries = newSeriesList[i]
del newSeriesList[i]
break
intervalCount = int((series.end - series.start) / interval)
series[0:len(series)] = newSeries
series.start = newSeries.start
Expand Down Expand Up @@ -3298,4 +3310,4 @@ def pieMinimum(requestContext, series):

#Avoid import circularity
if not environ.get('READTHEDOCS'):
from graphite.render.evaluator import evaluateTarget
from graphite.render.evaluator import evaluateTarget, evaluateTargets
23 changes: 14 additions & 9 deletions webapp/graphite/render/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from graphite.compat import HttpResponse
from graphite.remote_storage import HTTPConnectionWithTimeout
from graphite.logger import log
from graphite.render.evaluator import evaluateTarget
from graphite.render.evaluator import evaluateTarget, evaluateTargets
from graphite.render.attime import parseATTime
from graphite.render.functions import PieFunctions
from graphite.render.hashing import hashRequest, hashData
Expand Down Expand Up @@ -83,6 +83,7 @@ def renderView(request):
log.cache('Request-Cache miss [%s]' % requestKey)

# Now we prepare the requested data
targetsList = []
if requestOptions['graphType'] == 'pie':
for target in requestOptions['targets']:
if target.find(':') >= 0:
Expand All @@ -93,11 +94,13 @@ def renderView(request):
raise ValueError("Invalid target '%s'" % target)
data.append( (name,value) )
else:
seriesList = evaluateTarget(requestContext, target)
targetsList.append(target)

for series in seriesList:
func = PieFunctions[requestOptions['pieMode']]
data.append( (series.name, func(requestContext, series) or 0 ))
seriesList = evaluateTargets(requestContext, targetsList)

for series in seriesList:
func = PieFunctions[requestOptions['pieMode']]
data.append( (series.name, func(requestContext, series) or 0 ))

elif requestOptions['graphType'] == 'line':
# Let's see if at least our data is cached
Expand All @@ -117,13 +120,15 @@ def renderView(request):
if cachedData is not None:
requestContext['data'] = data = cachedData
else: # Have to actually retrieve the data now
t = time()
for target in requestOptions['targets']:
if not target.strip():
continue
t = time()
seriesList = evaluateTarget(requestContext, target)
log.rendering("Retrieval of %s took %.6f" % (target, time() - t))
data.extend(seriesList)
targetsList.append(target)

seriesList = evaluateTargets(requestContext, targetsList)
log.rendering("Retrieval of '%s' took %.6f" % (targetsList, time() - t))
data.extend(seriesList)

if useCache:
cache.add(dataKey, data, cacheTimeout)
Expand Down