Skip to content

Commit a3f6254

Browse files
committed
resolve comments
1 parent 7abfae0 commit a3f6254

File tree

2 files changed

+18
-40
lines changed

2 files changed

+18
-40
lines changed

google/cloud/storage/_experimental/asyncio/retry/writes_resumption_strategy.py

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class _WriteState:
3737

3838
def __init__(
3939
self,
40-
spec: storage_type.AppendObjectSpec,
40+
spec: Union[storage_type.AppendObjectSpec, storage_type.WriteObjectSpec],
4141
chunk_size: int,
4242
user_buffer: IO[bytes],
4343
):
@@ -48,7 +48,7 @@ def __init__(
4848
self.bytes_sent: int = 0
4949
self.write_handle: Union[bytes, storage_type.BidiWriteHandle, None] = None
5050
self.routing_token: Optional[str] = None
51-
self.is_complete: bool = False
51+
self.is_finalized: bool = False
5252

5353

5454
class _WriteResumptionStrategy(_BaseResumptionStrategy):
@@ -61,34 +61,29 @@ def generate_requests(
6161
6262
For Appendable Objects, every stream opening should send an
6363
AppendObjectSpec. If resuming, the `write_handle` is added to that spec.
64+
65+
This method is not applicable for `open` methods.
6466
"""
6567
write_state: _WriteState = state["write_state"]
6668

67-
if write_state.write_handle:
68-
write_state.spec.write_handle = write_state.write_handle
69-
70-
if write_state.routing_token:
71-
write_state.spec.routing_token = write_state.routing_token
72-
73-
# Initial request of the stream must provide the specification.
74-
# If we have a write_handle, we request a state lookup to verify persisted offset.
75-
do_state_lookup = write_state.write_handle is not None
69+
initial_request = storage_type.BidiWriteObjectRequest()
7670

7771
# Determine if we need to send WriteObjectSpec or AppendObjectSpec
78-
initial_request = storage_type.BidiWriteObjectRequest(
79-
state_lookup=do_state_lookup
80-
)
81-
8272
if isinstance(write_state.spec, storage_type.WriteObjectSpec):
8373
initial_request.write_object_spec = write_state.spec
8474
else:
75+
if write_state.write_handle:
76+
write_state.spec.write_handle = write_state.write_handle
77+
78+
if write_state.routing_token:
79+
write_state.spec.routing_token = write_state.routing_token
8580
initial_request.append_object_spec = write_state.spec
8681

8782
yield initial_request
8883

8984
# The buffer should already be seeked to the correct position (persisted_size)
9085
# by the `recover_state_on_failure` method before this is called.
91-
while not write_state.is_complete:
86+
while not write_state.is_finalized:
9287
chunk = write_state.user_buffer.read(write_state.chunk_size)
9388

9489
# End of File detection
@@ -113,16 +108,16 @@ def update_state_from_response(
113108
"""Processes a server response and updates the write state."""
114109
write_state: _WriteState = state["write_state"]
115110

116-
if response.persisted_size is not None:
117-
if response.persisted_size > write_state.persisted_size:
118-
write_state.persisted_size = response.persisted_size
111+
if response.persisted_size:
112+
write_state.persisted_size = response.persisted_size
119113

120114
if response.write_handle:
121115
write_state.write_handle = response.write_handle
122116

123117
if response.resource:
124-
write_state.is_complete = True
125118
write_state.persisted_size = response.resource.size
119+
if response.resource.finalize_time:
120+
write_state.is_finalized = True
126121

127122
async def recover_state_on_failure(
128123
self, error: Exception, state: Dict[str, Any]

tests/unit/asyncio/retry/test_writes_resumption_strategy.py

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import io
1616
import unittest
1717
import unittest.mock as mock
18+
from datetime import datetime
1819

1920
import pytest
2021
import google_crc32c
@@ -137,7 +138,6 @@ def test_generate_requests_resumption(self):
137138
self.assertEqual(
138139
requests[0].append_object_spec.write_handle.handle, b"test-handle"
139140
)
140-
self.assertTrue(requests[0].state_lookup)
141141

142142
# Check data starts from offset 4
143143
self.assertEqual(requests[1].write_offset, 4)
@@ -206,28 +206,11 @@ def test_update_state_from_response(self):
206206
strategy.update_state_from_response(response2, state)
207207
self.assertEqual(write_state.persisted_size, 1024)
208208

209-
final_resource = storage_type.Object(name="test-object", bucket="b", size=2048)
209+
final_resource = storage_type.Object(name="test-object", bucket="b", size=2048, finalize_time=datetime.now())
210210
response3 = storage_type.BidiWriteObjectResponse(resource=final_resource)
211211
strategy.update_state_from_response(response3, state)
212-
self.assertTrue(write_state.is_complete)
213-
self.assertEqual(write_state.persisted_size, 2048)
214-
215-
def test_update_state_from_response_ignores_smaller_persisted_size(self):
216-
strategy = self._make_one()
217-
state = {
218-
"write_state": _WriteState(
219-
mock.Mock(spec=storage_type.AppendObjectSpec),
220-
0,
221-
mock.Mock(spec=io.BytesIO),
222-
),
223-
}
224-
write_state = state["write_state"]
225-
write_state.persisted_size = 2048
226-
227-
response = storage_type.BidiWriteObjectResponse(persisted_size=1024)
228-
strategy.update_state_from_response(response, state)
229-
230212
self.assertEqual(write_state.persisted_size, 2048)
213+
self.assertTrue(write_state.is_finalized)
231214

232215
@pytest.mark.asyncio
233216
async def test_recover_state_on_failure_handles_redirect(self):

0 commit comments

Comments
 (0)