Skip to content

Commit d6aaf61

Browse files
authored
feat(spanner): add asynchronous code snippets and minor cleanup changes (#17337)
### 1. Partition Deserialization Simplification Addressing post merge minor comments from: #17014 2. Asynchronous Code Snippets & Integration Tests New Async Samples (async_snippets.py): Added standard asynchronous code snippets. New Integration Tests (async_snippets_test.py): Introduced integration tests using pytest-asyncio to sequentially execute and assert the output of all five asynchronous code snippets against a mock/live instance.
1 parent 5accbb4 commit d6aaf61

5 files changed

Lines changed: 197 additions & 24 deletions

File tree

packages/google-cloud-spanner/google/cloud/spanner_dbapi/partition_helper.py

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -90,23 +90,6 @@ def _deserialize_value(val: Any) -> Any:
9090
return val
9191

9292

93-
def _unpack_value_pb(value):
94-
which = value.WhichOneof("kind")
95-
if which == "null_value":
96-
return None
97-
elif which == "number_value":
98-
return value.number_value
99-
elif which == "string_value":
100-
return value.string_value
101-
elif which == "bool_value":
102-
return value.bool_value
103-
elif which == "struct_value":
104-
return {k: _unpack_value_pb(v) for k, v in value.struct_value.fields.items()}
105-
elif which == "list_value":
106-
return [_unpack_value_pb(v) for v in value.list_value.values]
107-
return None
108-
109-
11093
def decode_from_string(encoded_partition_id):
11194
gzip_bytes = base64.b64decode(bytes(encoded_partition_id, "utf-8"))
11295
partition_id_bytes = gzip.decompress(gzip_bytes)
@@ -124,9 +107,7 @@ def decode_from_string(encoded_partition_id):
124107
if "query" in partition_result and "params" in partition_result["query"]:
125108
params_pb = partition_result["query"]["params"]
126109
if params_pb:
127-
partition_result["query"]["params"] = {
128-
k: _unpack_value_pb(v) for k, v in params_pb.fields.items()
129-
}
110+
partition_result["query"]["params"] = MessageToDict(params_pb)
130111

131112
return PartitionId(btid, partition_result)
132113

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright 2026 Google LLC All rights reserved.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
"""This application demonstrates how to do basic asynchronous operations using
18+
Cloud Spanner.
19+
"""
20+
21+
import asyncio
22+
from google.cloud.spanner_v1 import AsyncClient
23+
from google.cloud.spanner_v1 import KeySet
24+
25+
# [START spanner_async_create_client]
26+
async def async_create_client(instance_id, database_id):
27+
"""Instantiates an asynchronous Spanner client."""
28+
spanner_client = AsyncClient()
29+
instance = spanner_client.instance(instance_id)
30+
database = instance.database(database_id)
31+
32+
print("Async Spanner client instantiated successfully.")
33+
return database
34+
# [END spanner_async_create_client]
35+
36+
37+
# [START spanner_async_query_data]
38+
async def async_query_data(instance_id, database_id):
39+
"""Queries sample data from the database using asynchronous SQL."""
40+
spanner_client = AsyncClient()
41+
instance = spanner_client.instance(instance_id)
42+
database = instance.database(database_id)
43+
44+
async with database.snapshot() as snapshot:
45+
results = await snapshot.execute_sql(
46+
"SELECT SingerId, AlbumId, AlbumTitle FROM Albums"
47+
)
48+
49+
async for row in results:
50+
print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row))
51+
# [END spanner_async_query_data]
52+
53+
54+
# [START spanner_async_insert_data]
55+
async def async_insert_data(instance_id, database_id):
56+
"""Inserts sample data into the database using DML asynchronously."""
57+
spanner_client = AsyncClient()
58+
instance = spanner_client.instance(instance_id)
59+
database = instance.database(database_id)
60+
61+
async def insert_singers(transaction):
62+
dml = (
63+
"INSERT INTO Singers (SingerId, FirstName, LastName) VALUES "
64+
"(12, 'Melissa', 'Garcia'), "
65+
"(13, 'Russell', 'Morales')"
66+
)
67+
await transaction.execute_update(dml)
68+
69+
await database.run_in_transaction(insert_singers)
70+
print("Async DML Insert transaction complete.")
71+
# [END spanner_async_insert_data]
72+
73+
74+
# [START spanner_async_read_write_transaction]
75+
async def async_read_write_transaction(instance_id, database_id):
76+
"""Performs an asynchronous read-write transaction."""
77+
spanner_client = AsyncClient()
78+
instance = spanner_client.instance(instance_id)
79+
database = instance.database(database_id)
80+
81+
async def update_singer_lastname(transaction):
82+
# Retrieve current name
83+
results = await transaction.execute_sql(
84+
"SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId = 12"
85+
)
86+
async for row in results:
87+
print("Before Update - SingerId: {}, FirstName: {}, LastName: {}".format(*row))
88+
89+
# Update LastName
90+
await transaction.execute_update(
91+
"UPDATE Singers SET LastName = 'Jackson' WHERE SingerId = 12"
92+
)
93+
94+
await database.run_in_transaction(update_singer_lastname)
95+
print("Async read-write transaction complete.")
96+
# [END spanner_async_read_write_transaction]
97+
98+
99+
# [START spanner_async_read_only_transaction]
100+
async def async_read_only_transaction(instance_id, database_id):
101+
"""Performs an asynchronous read-only transaction."""
102+
spanner_client = AsyncClient()
103+
instance = spanner_client.instance(instance_id)
104+
database = instance.database(database_id)
105+
106+
async with database.snapshot() as snapshot:
107+
# Execute a read using standard KeySet
108+
keyset = KeySet(all_=True)
109+
results = await snapshot.read(
110+
table="Singers",
111+
columns=("SingerId", "FirstName", "LastName"),
112+
keyset=keyset,
113+
)
114+
115+
async for row in results:
116+
print("Read Row - SingerId: {}, FirstName: {}, LastName: {}".format(*row))
117+
# [END spanner_async_read_only_transaction]
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# Copyright 2026 Google LLC All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import pytest
16+
import async_snippets
17+
18+
@pytest.fixture(scope="module")
19+
def database_ddl():
20+
"""DDL statements to set up the database for testing async snippets."""
21+
return [
22+
"""CREATE TABLE Singers (
23+
SingerId INT64 NOT NULL,
24+
FirstName STRING(1024),
25+
LastName STRING(1024),
26+
SingerInfo BYTES(MAX)
27+
) PRIMARY KEY (SingerId)""",
28+
"""CREATE TABLE Albums (
29+
SingerId INT64 NOT NULL,
30+
AlbumId INT64 NOT NULL,
31+
AlbumTitle STRING(MAX)
32+
) PRIMARY KEY (SingerId, AlbumId),
33+
INTERLEAVE IN PARENT Singers ON DELETE CASCADE"""
34+
]
35+
36+
37+
@pytest.mark.asyncio
38+
async def test_async_snippets_flow(capsys, instance_id, sample_database):
39+
# 1. Test Async Spanner Client Creation
40+
db = await async_snippets.async_create_client(instance_id, sample_database.database_id)
41+
assert db is not None
42+
out, _ = capsys.readouterr()
43+
assert "Async Spanner client instantiated successfully." in out
44+
45+
# 2. Test Async DML Insert
46+
await async_snippets.async_insert_data(instance_id, sample_database.database_id)
47+
out, _ = capsys.readouterr()
48+
assert "Async DML Insert transaction complete." in out
49+
50+
# 3. Seed additional albums data via sync batch write for query testing
51+
with sample_database.batch() as batch:
52+
batch.insert(
53+
table="Albums",
54+
columns=("SingerId", "AlbumId", "AlbumTitle"),
55+
values=[
56+
(12, 1, "Total Junk"),
57+
(13, 2, "Go, Go, Go"),
58+
],
59+
)
60+
61+
# 4. Test Async Query Data
62+
await async_snippets.async_query_data(instance_id, sample_database.database_id)
63+
out, _ = capsys.readouterr()
64+
assert "SingerId: 12, AlbumId: 1, AlbumTitle: Total Junk" in out
65+
assert "SingerId: 13, AlbumId: 2, AlbumTitle: Go, Go, Go" in out
66+
67+
# 5. Test Async Read-Write Transaction
68+
await async_snippets.async_read_write_transaction(instance_id, sample_database.database_id)
69+
out, _ = capsys.readouterr()
70+
assert "Before Update - SingerId: 12, FirstName: Melissa, LastName: Garcia" in out
71+
assert "Async read-write transaction complete." in out
72+
73+
# 6. Test Async Read-Only Transaction
74+
await async_snippets.async_read_only_transaction(instance_id, sample_database.database_id)
75+
out, _ = capsys.readouterr()
76+
assert "Read Row - SingerId: 12, FirstName: Melissa, LastName: Jackson" in out
77+
assert "Read Row - SingerId: 13, FirstName: Russell, LastName: Morales" in out

packages/google-cloud-spanner/tests/mockserver_tests/test_dbapi_partition_query.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
# Copyright 2024 Google LLC All rights reserved.
2-
#
1+
# Copyright 2026 Google LLC All rights reserved.
32
# Licensed under the Apache License, Version 2.0 (the "License");
43
# you may not use this file except in compliance with the License.
54
# You may obtain a copy of the License at

packages/google-cloud-spanner/tests/unit/spanner_dbapi/test_partition_helper.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
# Copyright 2024 Google LLC All rights reserved.
2-
#
1+
# Copyright 2026 Google LLC All rights reserved.
32
# Licensed under the Apache License, Version 2.0 (the "License");
43
# you may not use this file except in compliance with the License.
54
# You may obtain a copy of the License at

0 commit comments

Comments
 (0)