Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
59 changes: 17 additions & 42 deletions sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def _get_match(proto, filter_fn):


# V1b3 MetricStructuredName keys to accept and copy to the MetricKey labels.
STEP_LABEL = 'step'
STRUCTURED_NAME_LABELS = set(
['execution_step', 'original_name', 'output_user_name'])

Expand Down Expand Up @@ -113,7 +112,7 @@ def _translate_step_name(self, internal_name):
step = _get_match(
self._job_graph.proto.steps, lambda x: x.name == internal_name)
user_step_name = _get_match(
step.properties.additionalProperties,
step.properties.properties,
lambda x: x.key == 'user_name').value.string_value
except ValueError:
pass # Exception is handled below.
Expand All @@ -135,24 +134,22 @@ def _get_metric_key(self, metric):
# step name (only happens for unstructured-named metrics).
# 2. Unable to unpack [step] or [namespace]; which should only happen
# for unstructured names.
step = _get_match(
metric.name.context.additionalProperties,
lambda x: x.key == STEP_LABEL).value
step = metric.name.context['step']
step = self._translate_step_name(step)
except ValueError:
pass

namespace = "dataflow/v1b3" # Try to extract namespace or add a default.
try:
namespace = _get_match(
metric.name.context.additionalProperties,
lambda x: x.key == 'namespace').value
carried_namespace = metric.name.context['namespace']
if carried_namespace:
namespace = carried_namespace
except ValueError:
pass

for kv in metric.name.context.additionalProperties:
if kv.key in STRUCTURED_NAME_LABELS:
labels[kv.key] = kv.value
for key in metric.name.context:
if key in STRUCTURED_NAME_LABELS:
labels[key] = metric.name.context[key]
# Package everything besides namespace and name the labels as well,
# including unmodified step names to assist in integration the exact
# unmodified values which come from dataflow.
Expand Down Expand Up @@ -185,10 +182,7 @@ def _populate_metrics(self, response, result, user_metrics=False):
# in the service.
# The second way is only useful for the UI, and should be ignored.
continue
is_tentative = [
prop for prop in metric.name.context.additionalProperties
if prop.key == 'tentative' and prop.value == 'true'
]
is_tentative = metric.name.context['tentative']
tentative_or_committed = 'tentative' if is_tentative else 'committed'

metric_key = self._get_metric_key(metric)
Expand All @@ -208,33 +202,14 @@ def _get_metric_value(self, metric):
if metric is None:
return None

if metric.scalar is not None:
return metric.scalar.integer_value
elif metric.distribution is not None:
dist_count = _get_match(
metric.distribution.object_value.properties,
lambda x: x.key == 'count').value.integer_value
dist_min = _get_match(
metric.distribution.object_value.properties,
lambda x: x.key == 'min').value.integer_value
dist_max = _get_match(
metric.distribution.object_value.properties,
lambda x: x.key == 'max').value.integer_value
dist_sum = _get_match(
metric.distribution.object_value.properties,
lambda x: x.key == 'sum').value.integer_value
if dist_sum is None:
# distribution metric is not meant to use on large values, but in case
# it is, the value can overflow and become double_value, the correctness
# of the value may not be guaranteed.
_LOGGER.info(
"Distribution metric sum value seems to have "
"overflowed integer_value range, the correctness of sum or mean "
"value may not be guaranteed: %s" % metric.distribution)
dist_sum = int(
_get_match(
metric.distribution.object_value.properties,
lambda x: x.key == 'sum').value.double_value)
if metric.HasField('scalar'):
# This will always be a single value if there is any data in the field.
return metric.scalar.number_value
elif metric.HasField('distribution'):
dist_count = metric.distribution.struct_value.fields['count'].number_value
dist_min = metric.distribution.struct_value.fields['min'].number_value
dist_max = metric.distribution.struct_value.fields['max'].number_value
dist_sum = metric.distribution.struct_value.fields['sum'].number_value
return DistributionResult(
DistributionData(dist_sum, dist_count, dist_min, dist_max))
#TODO(https://github.com/apache/beam/issues/31788) support StringSet after
Expand Down
Loading
Loading