Skip to content

Commit b6a85e4

Browse files
authored
feat(storage): full object checksum: integrate full-object checksum in AsyncMultiRangeDownloader (#17263)
### 1. Overview of the Solution This solution implements end-to-end full-object checksum validation in `AsyncMultiRangeDownloader` for the asynchronous Google Cloud Storage Python client library. As asynchronous multiplexed downloads of non-contiguous ranges are performed concurrently over a single bidirectional gRPC connection, this feature automatically and incrementally calculates a rolling checksum as bytes arrive and validates it against the server's authoritative object checksum once the download completes. The technical approach consists of three coordinated layers: * **`_AsyncReadObjectStream` (Stream Ingestion)**: Safely extracts the authoritative server checksum (`full_obj_server_crc32c`) and finalization status (`is_finalized`) from the object metadata received in the first data payload response of the stream. * **`_ReadResumptionStrategy` & `_DownloadState` (Verification Logic)**: Computes an isolated, persistent rolling checksum in the individual `_DownloadState` object to ensure calculations do not bleed across concurrent multiplexed ranges. Crucially, the rolling hash updates only *after* buffer writes succeed to prevent state corruption during retry re-connects, raising a `DataCorruption` exception on completion if a mismatch occurs. * **`AsyncMultiRangeDownloader` (Orchestration & Cleanup)**: Detects candidate full-object ranges (e.g., `(0, 0)` or `(0, persisted_size)`), propagates checksum settings to the resumption strategy, and guarantees robust cleanup (closing the stream immediately and unregistering IDs) if data corruption or write errors occur. ### 2. What This PR Specifically Does This PR implements **Step 3: Downloader Orchestration & End-to-End Integration/System Tests** of the solution: * Relocates `raise_if_no_fast_crc32c()` validation to the execution phase (`download_ranges()`) instead of construction time. * Propagates stream details (`is_finalized`, `full_obj_server_crc32c`) to the resumption state dictionary. * Detects implicit full-object downloads (`(0, 0)`) or explicit full-object downloads (`(0, persisted_size)`) post-`open()`, and flags them for validation. * Implements the robust cleanup guarantee in `download_ranges()`: wraps execution in a robust `try...finally` block to close the stream immediately and unregister multiplexer range IDs upon a `DataCorruption` exception. * Adds integration tests in `test_async_multi_range_downloader.py` and extensive end-to-end system tests in `test_zonal.py` checking finalized, unfinalized (appendable), explicit, implicit, and bypassed range downloads against live GCS buckets.
1 parent 2361ba6 commit b6a85e4

6 files changed

Lines changed: 257 additions & 11 deletions

File tree

packages/google-cloud-storage/google/cloud/storage/_helpers.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,11 @@
3333
from google.cloud.exceptions import NotFound
3434

3535
from google.cloud.storage._opentelemetry_tracing import (
36-
create_trace_span as _base_create_trace_span,
3736
_is_bucket_metadata_disabled,
3837
)
38+
from google.cloud.storage._opentelemetry_tracing import (
39+
create_trace_span as _base_create_trace_span,
40+
)
3941
from google.cloud.storage.constants import _DEFAULT_TIMEOUT
4042
from google.cloud.storage.retry import (
4143
DEFAULT_RETRY,

packages/google-cloud-storage/google/cloud/storage/_http.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525
from google.cloud.storage import __version__, _helpers
2626
from google.cloud.storage._opentelemetry_tracing import (
2727
HAS_OPENTELEMETRY,
28+
_is_bucket_metadata_disabled,
2829
create_trace_span,
2930
enable_otel_traces,
30-
_is_bucket_metadata_disabled,
3131
)
3232

3333
logger = logging.getLogger(__name__)

packages/google-cloud-storage/google/cloud/storage/asyncio/async_multi_range_downloader.py

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
_DownloadState,
4545
_ReadResumptionStrategy,
4646
)
47+
from google.cloud.storage.exceptions import DataCorruption
4748

4849
from ._utils import raise_if_no_fast_crc32c
4950

@@ -219,8 +220,6 @@ def __init__(
219220
)
220221
generation = kwargs.pop("generation_number")
221222

222-
raise_if_no_fast_crc32c()
223-
224223
self.client = client
225224
self.bucket_name = bucket_name
226225
self.object_name = object_name
@@ -232,6 +231,8 @@ def __init__(
232231
self._multiplexer: Optional[_StreamMultiplexer] = None
233232
self.persisted_size: Optional[int] = None # updated after opening the stream
234233
self._open_retries: int = 0
234+
self.is_finalized: bool = False
235+
self.full_obj_server_crc32c: Optional[int] = None
235236

236237
async def __aenter__(self):
237238
"""Opens the underlying bidi-gRPC connection to read from the object."""
@@ -327,6 +328,8 @@ async def _do_open():
327328
self.read_handle = self.read_obj_str.read_handle
328329
if self.read_obj_str.persisted_size is not None:
329330
self.persisted_size = self.read_obj_str.persisted_size
331+
self.is_finalized = self.read_obj_str.is_finalized
332+
self.full_obj_server_crc32c = self.read_obj_str.full_obj_server_crc32c
330333

331334
self._is_stream_open = True
332335

@@ -363,6 +366,8 @@ async def factory():
363366
self.generation = stream.generation_number
364367
if stream.read_handle:
365368
self.read_handle = stream.read_handle
369+
self.is_finalized = stream.is_finalized
370+
self.full_obj_server_crc32c = stream.full_obj_server_crc32c
366371

367372
self.read_obj_str = stream
368373
self._is_stream_open = True
@@ -377,6 +382,7 @@ async def download_ranges(
377382
lock: asyncio.Lock = None,
378383
retry_policy: Optional[AsyncRetry] = None,
379384
metadata: Optional[List[Tuple[str, str]]] = None,
385+
enable_checksum: bool = True,
380386
) -> None:
381387
"""Downloads multiple byte ranges from the object into the buffers
382388
provided by user with automatic retries.
@@ -412,6 +418,9 @@ async def download_ranges(
412418
"Invalid input - length of read_ranges cannot be more than 1000"
413419
)
414420

421+
if enable_checksum:
422+
raise_if_no_fast_crc32c()
423+
415424
if not self._is_stream_open:
416425
raise ValueError("Underlying bidi-gRPC stream is not open")
417426

@@ -422,16 +431,30 @@ async def download_ranges(
422431
download_states = {}
423432
for read_range in read_ranges:
424433
read_id = generate_random_56_bit_integer()
434+
# Unpack tuple into self-documenting variable names to improve readability.
435+
offset, length, user_buffer = read_range
436+
437+
# Heuristic to detect full object reads:
438+
# - Implicit full object read: start offset is 0 and length is 0 (read all).
439+
# - Explicit full object read: start offset is 0 and length matches the exact persisted size.
440+
is_full_object_read = (offset == 0 and length == 0) or (
441+
self.persisted_size is not None
442+
and offset == 0
443+
and length == self.persisted_size
444+
)
425445
download_states[read_id] = _DownloadState(
426-
initial_offset=read_range[0],
427-
initial_length=read_range[1],
428-
user_buffer=read_range[2],
446+
initial_offset=offset,
447+
initial_length=length,
448+
user_buffer=user_buffer,
449+
is_full_object_read=is_full_object_read,
429450
)
430451

431452
initial_state = {
432453
"download_states": download_states,
433454
"read_handle": self.read_handle,
434455
"routing_token": None,
456+
"enable_checksum": enable_checksum,
457+
"full_obj_server_crc32c": self.full_obj_server_crc32c,
435458
}
436459

437460
read_ids = set(download_states.keys())
@@ -519,12 +542,18 @@ async def generator():
519542
strategy, send_and_recv_via_multiplexer
520543
)
521544

522-
await retry_manager.execute(initial_state, retry_policy)
545+
try:
546+
await retry_manager.execute(initial_state, retry_policy)
547+
except DataCorruption:
548+
if self.is_stream_open:
549+
await self.close()
550+
raise
523551

524552
if initial_state.get("read_handle"):
525553
self.read_handle = initial_state["read_handle"]
526554
finally:
527-
self._multiplexer.unregister(read_ids)
555+
if self._multiplexer is not None:
556+
self._multiplexer.unregister(read_ids)
528557

529558
async def close(self):
530559
"""

packages/google-cloud-storage/tests/system/test_zonal.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
ObjectCustomContextPayload,
2828
)
2929

30+
3031
pytestmark = pytest.mark.skipif(
3132
os.getenv("RUN_ZONAL_SYSTEM_TESTS") != "True",
3233
reason="Zonal system tests need to be explicitly enabled. This helps scheduling tests in Kokoro and Cloud Build.",
@@ -961,3 +962,88 @@ async def _run():
961962
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
962963

963964
event_loop.run_until_complete(_run())
965+
966+
967+
@pytest.mark.parametrize(
968+
"read_start, read_length, enable_checksum",
969+
[
970+
(0, 0, True),
971+
(0, 1024 * 1024, True),
972+
(0, 0, False),
973+
],
974+
)
975+
def test_mrd_checksum_validation(
976+
storage_client,
977+
blobs_to_delete,
978+
event_loop,
979+
grpc_client_direct,
980+
read_start,
981+
read_length,
982+
enable_checksum,
983+
):
984+
"""
985+
Tests full downloads with specified offset, length, and enable_checksum toggle on finalized objects.
986+
"""
987+
object_size = 1024 * 1024 # 1MB
988+
object_name = f"test_mrd_chksum-{uuid.uuid4()}"
989+
990+
async def _run():
991+
object_data = os.urandom(object_size)
992+
993+
writer = AsyncAppendableObjectWriter(
994+
grpc_client_direct, _ZONAL_BUCKET, object_name
995+
)
996+
await writer.open()
997+
await writer.append(object_data)
998+
await writer.close(finalize_on_close=True)
999+
1000+
async with AsyncMultiRangeDownloader(
1001+
grpc_client_direct, _ZONAL_BUCKET, object_name
1002+
) as mrd:
1003+
buffer = BytesIO()
1004+
await mrd.download_ranges(
1005+
[(read_start, read_length, buffer)], enable_checksum=enable_checksum
1006+
)
1007+
assert buffer.getvalue() == object_data
1008+
1009+
# cleanup
1010+
del writer
1011+
gc.collect()
1012+
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
1013+
1014+
event_loop.run_until_complete(_run())
1015+
1016+
1017+
def test_mrd_checksum_unfinalized_appendable_skipped(
1018+
storage_client, blobs_to_delete, event_loop, grpc_client_direct
1019+
):
1020+
"""
1021+
Verifies that live, unfinalized appendable objects skip the full-object checksum check
1022+
naturally without raising any exceptions.
1023+
"""
1024+
object_name = f"test_mrd_chksum_unfin-{uuid.uuid4()}"
1025+
1026+
async def _run():
1027+
writer = AsyncAppendableObjectWriter(
1028+
grpc_client_direct, _ZONAL_BUCKET, object_name
1029+
)
1030+
await writer.open()
1031+
await writer.append(_BYTES_TO_UPLOAD)
1032+
await writer.flush() # Flushed but not finalized!
1033+
1034+
# Download the unfinalized appendable object with enable_checksum=True
1035+
async with AsyncMultiRangeDownloader(
1036+
grpc_client_direct, _ZONAL_BUCKET, object_name
1037+
) as mrd:
1038+
buffer = BytesIO()
1039+
# Since it's unfinalized, it should skip the checksum check without raising
1040+
await mrd.download_ranges([(0, 0, buffer)], enable_checksum=True)
1041+
assert buffer.getvalue() == _BYTES_TO_UPLOAD
1042+
1043+
# cleanup
1044+
await writer.close()
1045+
del writer
1046+
gc.collect()
1047+
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
1048+
1049+
event_loop.run_until_complete(_run())

packages/google-cloud-storage/tests/unit/asyncio/test_async_multi_range_downloader.py

Lines changed: 130 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,12 +308,16 @@ async def test_downloading_without_opening_should_throw_error(self):
308308
assert not mrd.is_stream_open
309309

310310
@mock.patch("google.cloud.storage.asyncio._utils.google_crc32c")
311-
def test_init_raises_if_crc32c_c_extension_is_missing(self, mock_google_crc32c):
311+
@pytest.mark.asyncio
312+
async def test_download_ranges_raises_if_crc32c_c_extension_is_missing(
313+
self, mock_google_crc32c
314+
):
312315
mock_google_crc32c.implementation = "python"
313316
mock_client = mock.MagicMock()
317+
mrd = AsyncMultiRangeDownloader(mock_client, "bucket", "object")
314318

315319
with pytest.raises(exceptions.FailedPrecondition) as exc_info:
316-
AsyncMultiRangeDownloader(mock_client, "bucket", "object")
320+
await mrd.download_ranges([(0, 10, BytesIO())])
317321

318322
assert "The google-crc32c package is not installed with C support" in str(
319323
exc_info.value
@@ -579,3 +583,127 @@ async def staged_recv():
579583

580584
# Assert
581585
mock_logger.info.assert_any_call("Resuming download (attempt 2) for 1 ranges.")
586+
587+
@mock.patch(
588+
"google.cloud.storage.asyncio.async_multi_range_downloader._AsyncReadObjectStream"
589+
)
590+
@pytest.mark.asyncio
591+
async def test_open_populates_checksum_properties(
592+
self, mock_cls_async_read_object_stream
593+
):
594+
# Arrange
595+
mock_client = mock.MagicMock()
596+
mock_client.grpc_client = mock.AsyncMock()
597+
mock_stream = mock_cls_async_read_object_stream.return_value
598+
mock_stream.open = AsyncMock()
599+
mock_stream.generation_number = 123
600+
mock_stream.persisted_size = 100
601+
mock_stream.read_handle = b"h"
602+
mock_stream.is_finalized = True
603+
mock_stream.full_obj_server_crc32c = 999
604+
605+
mrd = AsyncMultiRangeDownloader(mock_client, "bucket", "object")
606+
assert mrd.is_finalized is False
607+
assert mrd.full_obj_server_crc32c is None
608+
609+
# Act
610+
await mrd.open()
611+
612+
# Assert
613+
assert mrd.is_finalized is True
614+
assert mrd.full_obj_server_crc32c == 999
615+
616+
@mock.patch(
617+
"google.cloud.storage.asyncio.async_multi_range_downloader._ReadResumptionStrategy"
618+
)
619+
@mock.patch(
620+
"google.cloud.storage.asyncio.async_multi_range_downloader._BidiStreamRetryManager"
621+
)
622+
@mock.patch(
623+
"google.cloud.storage.asyncio.async_multi_range_downloader._AsyncReadObjectStream"
624+
)
625+
@pytest.mark.asyncio
626+
async def test_download_ranges_configures_full_object_read_state(
627+
self,
628+
mock_cls_async_read_object_stream,
629+
mock_retry_manager_cls,
630+
mock_strategy_cls,
631+
):
632+
# Arrange
633+
mock_client = mock.MagicMock()
634+
mock_client.grpc_client = mock.AsyncMock()
635+
mock_stream = mock_cls_async_read_object_stream.return_value
636+
mock_stream.open = AsyncMock()
637+
mock_stream.persisted_size = 100
638+
mock_stream.is_finalized = True
639+
mock_stream.full_obj_server_crc32c = 999
640+
641+
mrd = await AsyncMultiRangeDownloader.create_mrd(mock_client, "b", "o")
642+
643+
mock_retry_manager = mock_retry_manager_cls.return_value
644+
mock_retry_manager.execute = AsyncMock()
645+
646+
# Act
647+
# Implicit full read (0, 0) and explicit full read (0, persisted_size=100)
648+
ranges = [(0, 0, BytesIO()), (0, 100, BytesIO()), (10, 20, BytesIO())]
649+
await mrd.download_ranges(ranges, enable_checksum=True)
650+
651+
# Assert
652+
mock_retry_manager.execute.assert_called_once()
653+
initial_state = mock_retry_manager.execute.call_args[0][0]
654+
655+
download_states = initial_state["download_states"]
656+
assert len(download_states) == 3
657+
658+
states_list = list(download_states.values())
659+
# First state: (0, 0) -> is_full_object_read is True
660+
assert states_list[0].is_full_object_read is True
661+
assert states_list[0].rolling_checksum is not None
662+
663+
# Second state: (0, 100) -> is_full_object_read is True
664+
assert states_list[1].is_full_object_read is True
665+
assert states_list[1].rolling_checksum is not None
666+
667+
# Third state: (10, 20) -> is_full_object_read is False
668+
assert states_list[2].is_full_object_read is False
669+
assert states_list[2].rolling_checksum is None
670+
671+
# State values for enable_checksum and crc32c
672+
assert initial_state["enable_checksum"] is True
673+
assert initial_state["full_obj_server_crc32c"] == 999
674+
675+
@mock.patch(
676+
"google.cloud.storage.asyncio.async_multi_range_downloader._ReadResumptionStrategy"
677+
)
678+
@mock.patch(
679+
"google.cloud.storage.asyncio.async_multi_range_downloader._BidiStreamRetryManager"
680+
)
681+
@mock.patch(
682+
"google.cloud.storage.asyncio.async_multi_range_downloader._AsyncReadObjectStream"
683+
)
684+
@pytest.mark.asyncio
685+
async def test_download_ranges_closes_on_datacorruption(
686+
self,
687+
mock_cls_async_read_object_stream,
688+
mock_retry_manager_cls,
689+
mock_strategy_cls,
690+
):
691+
# Arrange
692+
mock_client = mock.MagicMock()
693+
mock_client.grpc_client = mock.AsyncMock()
694+
mock_stream = mock_cls_async_read_object_stream.return_value
695+
mock_stream.open = AsyncMock()
696+
697+
mrd = await AsyncMultiRangeDownloader.create_mrd(mock_client, "b", "o")
698+
mrd.close = AsyncMock()
699+
700+
mock_retry_manager = mock_retry_manager_cls.return_value
701+
mock_retry_manager.execute = AsyncMock(
702+
side_effect=DataCorruption(None, "corrupted")
703+
)
704+
705+
# Act & Assert
706+
with pytest.raises(DataCorruption):
707+
await mrd.download_ranges([(0, 0, BytesIO())])
708+
709+
mrd.close.assert_called_once()

packages/google-cloud-storage/tests/unit/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# limitations under the License.
1515

1616
import asyncio
17+
1718
import pytest
1819

1920

0 commit comments

Comments
 (0)