Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions sdks/python/apache_beam/io/gcp/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,12 +658,22 @@ def _flush(self):
# Deserialize the protobuf to get the original PubsubMessage
pubsub_msg = PubsubMessage._from_proto_str(elem)

# Publish with the correct data and attributes
# Publish with the correct data, attributes, and ordering_key
if self.with_attributes and pubsub_msg.attributes:
future = self._pub_client.publish(
self._topic, pubsub_msg.data, **pubsub_msg.attributes)
self._topic,
pubsub_msg.data,
ordering_key=pubsub_msg.ordering_key
if pubsub_msg.ordering_key else '',
**pubsub_msg.attributes)
else:
future = self._pub_client.publish(self._topic, pubsub_msg.data)
if pubsub_msg.ordering_key:
future = self._pub_client.publish(
self._topic,
pubsub_msg.data,
ordering_key=pubsub_msg.ordering_key)
else:
future = self._pub_client.publish(self._topic, pubsub_msg.data)
Comment on lines 662 to +676
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for publishing messages can be simplified to improve readability and reduce code duplication. You can determine the attributes and ordering_key to use and then have a single call to self._pub_client.publish. This makes the code easier to maintain.

      attributes = {}
      if self.with_attributes and pubsub_msg.attributes:
        attributes = pubsub_msg.attributes

      if pubsub_msg.ordering_key:
        future = self._pub_client.publish(
            self._topic,
            pubsub_msg.data,
            ordering_key=pubsub_msg.ordering_key,
            **attributes)
      else:
        future = self._pub_client.publish(
            self._topic, pubsub_msg.data, **attributes)

Comment on lines 662 to +676
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for publishing messages can be simplified to reduce code duplication and improve readability. Instead of separate code paths for messages with and without attributes, you can build a dictionary of keyword arguments for the publish call. This makes the code cleaner and easier to maintain.

      publish_kwargs = {}
      if self.with_attributes and pubsub_msg.attributes:
        publish_kwargs.update(pubsub_msg.attributes)

      if pubsub_msg.ordering_key:
        publish_kwargs['ordering_key'] = pubsub_msg.ordering_key

      future = self._pub_client.publish(
          self._topic, pubsub_msg.data, **publish_kwargs)


futures.append(future)

Expand Down
45 changes: 45 additions & 0 deletions sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,51 @@ def test_batch_write_with_attributes(self):
"""Test WriteToPubSub in batch mode with attributes."""
self._test_batch_write(with_attributes=True)

@pytest.mark.it_postcommit
def test_batch_write_with_ordering_key(self):
"""Test WriteToPubSub in batch mode with ordering keys."""
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.transforms import Create

# Create test messages with ordering keys
test_messages = [
PubsubMessage(
b'order_data001', {'attr': 'value1'}, ordering_key='key1'),
PubsubMessage(
b'order_data002', {'attr': 'value2'}, ordering_key='key1'),
PubsubMessage(
b'order_data003', {'attr': 'value3'}, ordering_key='key2')
]

pipeline_options = PipelineOptions()
pipeline_options.view_as(StandardOptions).streaming = False

with TestPipeline(options=pipeline_options) as p:
messages = p | 'CreateMessages' >> Create(test_messages)
_ = messages | 'WriteToPubSub' >> WriteToPubSub(
self.output_topic.name, with_attributes=True)

# Verify messages were published
time.sleep(10)

response = self.sub_client.pull(
request={
"subscription": self.output_sub.name,
"max_messages": 10,
})

self.assertEqual(len(response.received_messages), len(test_messages))

# Verify ordering keys were preserved
for received_message in response.received_messages:
self.assertIn('ordering_key', dir(received_message.message))
self.sub_client.acknowledge(
request={
"subscription": self.output_sub.name,
"ack_ids": [received_message.ack_id],
})
Comment on lines +345 to +351
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current verification for ordering_key only checks for its presence. To make the test more robust, you should assert that the ordering_key has the expected value for each message.

Additionally, you can improve efficiency by acknowledging all messages in a single batch request after pulling them, rather than one by one inside the loop.

Suggested change
for received_message in response.received_messages:
self.assertIn('ordering_key', dir(received_message.message))
self.sub_client.acknowledge(
request={
"subscription": self.output_sub.name,
"ack_ids": [received_message.ack_id],
})
received_map = {msg.message.data: msg for msg in response.received_messages}
self.assertEqual(received_map[b'order_data001'].message.ordering_key, 'key1')
self.assertEqual(received_map[b'order_data002'].message.ordering_key, 'key1')
self.assertEqual(received_map[b'order_data003'].message.ordering_key, 'key2')
ack_ids = [msg.ack_id for msg in response.received_messages]
if ack_ids:
self.sub_client.acknowledge(
request={
"subscription": self.output_sub.name,
"ack_ids": ack_ids,
})

Comment on lines +345 to +351
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The assertion in this test is quite weak. It only checks for the presence of the ordering_key attribute on the message object using dir(), but doesn't verify its value. A more robust test would be to check that the received messages have the correct ordering keys and attributes corresponding to the messages that were sent. Also, acknowledging messages one by one in a loop is inefficient; it's better to collect all ack_ids and acknowledge them in a single call.

Suggested change
for received_message in response.received_messages:
self.assertIn('ordering_key', dir(received_message.message))
self.sub_client.acknowledge(
request={
"subscription": self.output_sub.name,
"ack_ids": [received_message.ack_id],
})
# Verify ordering keys and attributes were preserved
received_msgs_map = {
msg.message.data: msg.message
for msg in response.received_messages
}
for sent_msg in test_messages:
self.assertIn(sent_msg.data, received_msgs_map)
received_msg = received_msgs_map[sent_msg.data]
self.assertEqual(received_msg.ordering_key, sent_msg.ordering_key)
self.assertEqual(dict(received_msg.attributes), sent_msg.attributes)
# Acknowledge all messages at once for efficiency
ack_ids = [msg.ack_id for msg in response.received_messages]
if ack_ids:
self.sub_client.acknowledge(
request={
"subscription": self.output_sub.name,
"ack_ids": ack_ids,
})



if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
Expand Down
65 changes: 65 additions & 0 deletions sdks/python/apache_beam/io/gcp/pubsub_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1098,6 +1098,71 @@ def test_write_to_pubsub_with_attributes_no_overwrite(self, unused_mock):
Lineage.query(p.result.metrics(), Lineage.SINK),
set(["pubsub:topic:fakeprj.a_topic"]))

def test_write_messages_with_ordering_key(self, mock_pubsub):
"""Test WriteToPubSub with ordering_key in messages."""
data = b'data'
ordering_key = 'order-123'
attributes = {'key': 'value'}
payloads = [PubsubMessage(data, attributes, ordering_key=ordering_key)]

options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
_ = (
p
| Create(payloads)
| WriteToPubSub(
'projects/fakeprj/topics/a_topic', with_attributes=True))

# Verify that publish was called with ordering_key
mock_pubsub.return_value.publish.assert_called()
call_args = mock_pubsub.return_value.publish.call_args

# Check that ordering_key was passed as a keyword argument
self.assertIn('ordering_key', call_args.kwargs)
self.assertEqual(call_args.kwargs['ordering_key'], ordering_key)

def test_write_messages_with_ordering_key_no_attributes(self, mock_pubsub):
"""Test WriteToPubSub with ordering_key but no attributes."""
data = b'data'
ordering_key = 'order-456'
payloads = [PubsubMessage(data, None, ordering_key=ordering_key)]

options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
_ = (
p
| Create(payloads)
| WriteToPubSub(
'projects/fakeprj/topics/a_topic', with_attributes=True))

# Verify that publish was called with ordering_key
mock_pubsub.return_value.publish.assert_called()
call_args = mock_pubsub.return_value.publish.call_args

# Check that ordering_key was passed
self.assertIn('ordering_key', call_args.kwargs)
self.assertEqual(call_args.kwargs['ordering_key'], ordering_key)

def test_write_messages_without_ordering_key(self, mock_pubsub):
"""Test WriteToPubSub without ordering_key (backward compatibility)."""
data = b'data'
attributes = {'key': 'value'}
payloads = [PubsubMessage(data, attributes)] # No ordering_key

options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
_ = (
p
| Create(payloads)
| WriteToPubSub(
'projects/fakeprj/topics/a_topic', with_attributes=True))

# Verify that publish was called
mock_pubsub.return_value.publish.assert_called()
Comment on lines +1163 to +1164
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To make this test more robust, you should also verify that ordering_key is not passed to the publish method when no ordering key is provided in the PubsubMessage. You can do this by checking the call_args of the mock.

Suggested change
# Verify that publish was called
mock_pubsub.return_value.publish.assert_called()
# Verify that publish was called
mock_pubsub.return_value.publish.assert_called()
call_args = mock_pubsub.return_value.publish.call_args
self.assertNotIn('ordering_key', call_args.kwargs)



if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
Loading