What happened?
If you specify a side input to CombineGlobally, you end up with a pretty inscrutable error (from an end user's perspective) that comes from pipeline translation.
It looks like the combiner lifting phase improperly assumes that combine transforms will only have a single input pcollection.
Reproducing example:
https://play.beam.apache.org/?sdk=python&shared=vjM_k2TvNrf
def combineglobally_function(test=None):
# [START combineglobally_function]
import apache_beam as beam
def get_common_items(sets, side):
# set.intersection() takes multiple sets as separete arguments.
# We unpack the `sets` list into multiple arguments with the * operator.
# The combine transform might give us an empty list of `sets`,
# so we use a list with an empty set as a default value.
return set.intersection(*(sets or [set()]))
with beam.Pipeline() as pipeline:
pc = (pipeline | beam.Create([1]))
common_items = (
pipeline
| 'Create produce' >> beam.Create([
{'🍓', '🥕', '🍌', '🍅', '🌶️'},
{'🍇', '🥕', '🥝', '🍅', '🥔'},
{'🍉', '🥕', '🍆', '🍅', '🍍'},
{'🥑', '🥕', '🌽', '🍅', '🥥'},
])
| 'Get common items' >> beam.CombineGlobally(get_common_items, side=beam.pvalue.AsSingleton(pc))
| beam.Map(print))
# [END combineglobally_function]
if test:
test(common_items)
if __name__ == '__main__':
combineglobally_function()
(Code in case the playground links expires)
Issue Priority
Priority: 3 (minor)
Issue Components