From 4686bcd16e32c6a80e80e397dbfc19092c7596f4 Mon Sep 17 00:00:00 2001 From: Nikita Grover Date: Mon, 19 Jan 2026 00:01:08 +0530 Subject: [PATCH 1/5] Fix WriteToPubSub to pass ordering_key to publish() method Fixes #36201 --- sdks/python/apache_beam/io/gcp/pubsub.py | 16 ++++- .../io/gcp/pubsub_integration_test.py | 45 ++++++++++++ sdks/python/apache_beam/io/gcp/pubsub_test.py | 68 ++++++++++++++++++- 3 files changed, 124 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index 59eadee5538e..69effb960eeb 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -658,12 +658,22 @@ def _flush(self): # Deserialize the protobuf to get the original PubsubMessage pubsub_msg = PubsubMessage._from_proto_str(elem) - # Publish with the correct data and attributes + # Publish with the correct data, attributes, and ordering_key if self.with_attributes and pubsub_msg.attributes: future = self._pub_client.publish( - self._topic, pubsub_msg.data, **pubsub_msg.attributes) + self._topic, + pubsub_msg.data, + ordering_key=pubsub_msg.ordering_key + if pubsub_msg.ordering_key else '', + **pubsub_msg.attributes) else: - future = self._pub_client.publish(self._topic, pubsub_msg.data) + if pubsub_msg.ordering_key: + future = self._pub_client.publish( + self._topic, + pubsub_msg.data, + ordering_key=pubsub_msg.ordering_key) + else: + future = self._pub_client.publish(self._topic, pubsub_msg.data) futures.append(future) diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py index 8387fe734fc1..cf8323c45187 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py @@ -305,6 +305,51 @@ def test_batch_write_with_attributes(self): """Test WriteToPubSub in batch mode with attributes.""" self._test_batch_write(with_attributes=True) + @pytest.mark.it_postcommit + def test_batch_write_with_ordering_key(self): + """Test WriteToPubSub in batch mode with ordering keys.""" + from apache_beam.options.pipeline_options import PipelineOptions + from apache_beam.options.pipeline_options import StandardOptions + from apache_beam.transforms import Create + + # Create test messages with ordering keys + test_messages = [ + PubsubMessage( + b'order_data001', {'attr': 'value1'}, ordering_key='key1'), + PubsubMessage( + b'order_data002', {'attr': 'value2'}, ordering_key='key1'), + PubsubMessage( + b'order_data003', {'attr': 'value3'}, ordering_key='key2') + ] + + pipeline_options = PipelineOptions() + pipeline_options.view_as(StandardOptions).streaming = False + + with TestPipeline(options=pipeline_options) as p: + messages = p | 'CreateMessages' >> Create(test_messages) + _ = messages | 'WriteToPubSub' >> WriteToPubSub( + self.output_topic.name, with_attributes=True) + + # Verify messages were published + time.sleep(10) + + response = self.sub_client.pull( + request={ + "subscription": self.output_sub.name, + "max_messages": 10, + }) + + self.assertEqual(len(response.received_messages), len(test_messages)) + + # Verify ordering keys were preserved + for received_message in response.received_messages: + self.assertIn('ordering_key', dir(received_message.message)) + self.sub_client.acknowledge( + request={ + "subscription": self.output_sub.name, + "ack_ids": [received_message.ack_id], + }) + if __name__ == '__main__': logging.getLogger().setLevel(logging.DEBUG) diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index 5650e920e635..e91942e454aa 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -23,10 +23,9 @@ import logging import unittest +import apache_beam as beam import hamcrest as hc import mock - -import apache_beam as beam from apache_beam import Pipeline from apache_beam.io import Read from apache_beam.io import Write @@ -1098,6 +1097,71 @@ def test_write_to_pubsub_with_attributes_no_overwrite(self, unused_mock): Lineage.query(p.result.metrics(), Lineage.SINK), set(["pubsub:topic:fakeprj.a_topic"])) + def test_write_messages_with_ordering_key(self, mock_pubsub): + """Test WriteToPubSub with ordering_key in messages.""" + data = b'data' + ordering_key = 'order-123' + attributes = {'key': 'value'} + payloads = [PubsubMessage(data, attributes, ordering_key=ordering_key)] + + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + with TestPipeline(options=options) as p: + _ = ( + p + | Create(payloads) + | WriteToPubSub( + 'projects/fakeprj/topics/a_topic', with_attributes=True)) + + # Verify that publish was called with ordering_key + mock_pubsub.return_value.publish.assert_called() + call_args = mock_pubsub.return_value.publish.call_args + + # Check that ordering_key was passed as a keyword argument + self.assertIn('ordering_key', call_args.kwargs) + self.assertEqual(call_args.kwargs['ordering_key'], ordering_key) + + def test_write_messages_with_ordering_key_no_attributes(self, mock_pubsub): + """Test WriteToPubSub with ordering_key but no attributes.""" + data = b'data' + ordering_key = 'order-456' + payloads = [PubsubMessage(data, None, ordering_key=ordering_key)] + + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + with TestPipeline(options=options) as p: + _ = ( + p + | Create(payloads) + | WriteToPubSub( + 'projects/fakeprj/topics/a_topic', with_attributes=True)) + + # Verify that publish was called with ordering_key + mock_pubsub.return_value.publish.assert_called() + call_args = mock_pubsub.return_value.publish.call_args + + # Check that ordering_key was passed + self.assertIn('ordering_key', call_args.kwargs) + self.assertEqual(call_args.kwargs['ordering_key'], ordering_key) + + def test_write_messages_without_ordering_key(self, mock_pubsub): + """Test WriteToPubSub without ordering_key (backward compatibility).""" + data = b'data' + attributes = {'key': 'value'} + payloads = [PubsubMessage(data, attributes)] # No ordering_key + + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + with TestPipeline(options=options) as p: + _ = ( + p + | Create(payloads) + | WriteToPubSub( + 'projects/fakeprj/topics/a_topic', with_attributes=True)) + + # Verify that publish was called + mock_pubsub.return_value.publish.assert_called() + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) From dc55708cbae6e23506992bfcfcf92a872098a605 Mon Sep 17 00:00:00 2001 From: Nikita Grover <145201799+nikitagrover19@users.noreply.github.com> Date: Mon, 19 Jan 2026 11:42:47 +0530 Subject: [PATCH 2/5] Update pubsub_test.py --- sdks/python/apache_beam/io/gcp/pubsub_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index e91942e454aa..09abf8de6353 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -23,9 +23,9 @@ import logging import unittest -import apache_beam as beam import hamcrest as hc import mock +import apache_beam as beam from apache_beam import Pipeline from apache_beam.io import Read from apache_beam.io import Write From 25b3d821857fed5d545b43ceb8f9098b5e08e48a Mon Sep 17 00:00:00 2001 From: Nikita Grover <145201799+nikitagrover19@users.noreply.github.com> Date: Mon, 19 Jan 2026 11:44:59 +0530 Subject: [PATCH 3/5] Update pubsub_test.py --- sdks/python/apache_beam/io/gcp/pubsub_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index 09abf8de6353..14b361ae45fa 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -25,6 +25,7 @@ import hamcrest as hc import mock + import apache_beam as beam from apache_beam import Pipeline from apache_beam.io import Read From e13695408dfacf4789d2c36b8f559cc074222cec Mon Sep 17 00:00:00 2001 From: Nikita Grover Date: Tue, 20 Jan 2026 11:12:52 +0530 Subject: [PATCH 4/5] Trigger CI rerun From 4f5e0e9f9d43099140e7b6f9854509045bb94145 Mon Sep 17 00:00:00 2001 From: Nikita Grover Date: Tue, 20 Jan 2026 12:22:04 +0530 Subject: [PATCH 5/5] Retry CI (flake)