Skip to content

Commit 01b27f8

Browse files
committed
feat(gooddata-sdk): add AI Lake service with analyze_statistics
Wrap the AI Lake long-running-operation surface with a typed `CatalogAILakeService` exposing the three methods consumers need to trigger and wait on `ANALYZE TABLE` after registering aggregate-aware LDM shapes: - `analyze_statistics(instance_id, table_names=None, operation_id=None)` → `str`: posts the request and returns the operation ID. The caller can pre-supply a UUID; otherwise the service generates one and seeds it as the request `operation-id` header so the polling handle is known up front (the endpoint returns `Unit` body + the id in the response header). - `get_operation(operation_id)` → `CatalogAILakeOperation`: typed handle with `id`, `kind`, `status` (Literal "pending"/"succeeded"/ "failed" — these are the discriminator values of the OpenAPI `Operation` oneOf), and optional `result`/`error` payloads. Convenience predicates `is_terminal`, `is_succeeded`, `is_failed`. - `wait_for_operation(operation_id, timeout_s=300, poll_s=2)`: blocks until terminal, raises `CatalogAILakeOperationError` on `failed` and `TimeoutError` when the deadline elapses. Wiring: - `GoodDataApiClient.ai_lake_api` property now exposes `apis.AILakeApi` for callers who need raw access to surfaces this service does not yet wrap (database provisioning, pipe-table CRUD, service commands — follow-up tickets). - `GoodDataSdk.catalog_ai_lake` property surfaces the new service. - Public re-exports: `CatalogAILakeOperation`, `CatalogAILakeOperationError`, `CatalogAILakeService`. Tests (8 tests, all unit-mocked — no live stack needed): - analyze_statistics seeds caller-supplied UUID, generates one when omitted, normalizes empty `table_names` - get_operation handles success and failure shapes - wait_for_operation polls until succeeded, raises on failed, raises TimeoutError when never terminal Verified: full SDK unit suite — 425 passed, 2 skipped. JIRA: CQ-2320 risk: low
1 parent 1e51774 commit 01b27f8

6 files changed

Lines changed: 307 additions & 0 deletions

File tree

packages/gooddata-sdk/src/gooddata_sdk/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@
77
import logging
88

99
from gooddata_sdk._version import __version__
10+
from gooddata_sdk.catalog.ai_lake.service import (
11+
CatalogAILakeOperation,
12+
CatalogAILakeOperationError,
13+
CatalogAILakeService,
14+
)
1015
from gooddata_sdk.catalog.appearance.entity_model.color_palette import (
1116
CatalogColorPalette,
1217
CatalogColorPaletteAttributes,
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# (C) 2026 GoodData Corporation
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
# (C) 2026 GoodData Corporation
2+
"""SDK wrapper for the AI Lake long-running-operation surface.
3+
4+
Today this exposes only the operations needed by aggregate-aware LDMs:
5+
6+
- `analyze_statistics` triggers `ANALYZE TABLE` over a database instance so
7+
CBO statistics catch up after a schema or data change. Required after
8+
registering a pre-aggregation table whose dim attributes the platform will
9+
later resolve via filter pushdown.
10+
- `get_operation` and `wait_for_operation` cover the polling side of the
11+
long-running operation contract that `analyze_statistics` returns.
12+
13+
The full AI Lake API surface (database provisioning, pipe-table
14+
registration, service commands) is not yet wrapped here; consumers that
15+
need those should call `client.ai_lake_api.<method>` directly until a
16+
ticket adds typed wrappers.
17+
"""
18+
19+
from __future__ import annotations
20+
21+
import time
22+
import uuid
23+
from typing import Any, Literal
24+
25+
from attrs import define
26+
from gooddata_api_client.api.ai_lake_api import AILakeApi
27+
from gooddata_api_client.model.analyze_statistics_request import AnalyzeStatisticsRequest
28+
29+
from gooddata_sdk.catalog.base import Base
30+
from gooddata_sdk.client import GoodDataApiClient
31+
32+
# AI Lake operation status values (lower-case on the wire — these are the
33+
# discriminator values of the `Operation` oneOf on the OpenAPI side).
34+
OperationStatus = Literal["pending", "succeeded", "failed"]
35+
TERMINAL_STATUSES: frozenset[OperationStatus] = frozenset({"succeeded", "failed"})
36+
37+
38+
@define(kw_only=True)
39+
class CatalogAILakeOperation(Base):
40+
"""Long-running-operation handle returned by AI Lake actions."""
41+
42+
id: str
43+
kind: str
44+
status: OperationStatus
45+
result: dict[str, Any] | None = None
46+
error: dict[str, Any] | None = None
47+
48+
@property
49+
def is_terminal(self) -> bool:
50+
return self.status in TERMINAL_STATUSES
51+
52+
@property
53+
def is_succeeded(self) -> bool:
54+
return self.status == "succeeded"
55+
56+
@property
57+
def is_failed(self) -> bool:
58+
return self.status == "failed"
59+
60+
61+
class CatalogAILakeOperationError(RuntimeError):
62+
"""Raised when an AI Lake long-running operation finishes in `failed` state."""
63+
64+
def __init__(self, operation: CatalogAILakeOperation) -> None:
65+
self.operation = operation
66+
message = f"AI Lake operation {operation.id} ({operation.kind}) failed"
67+
if operation.error:
68+
message = f"{message}: {operation.error}"
69+
super().__init__(message)
70+
71+
72+
class CatalogAILakeService:
73+
"""Typed access to the AI Lake long-running-operation surface."""
74+
75+
def __init__(self, api_client: GoodDataApiClient) -> None:
76+
self._client = api_client
77+
self._ai_lake_api: AILakeApi = AILakeApi(api_client._api_client)
78+
79+
def analyze_statistics(
80+
self,
81+
instance_id: str,
82+
table_names: list[str] | None = None,
83+
operation_id: str | None = None,
84+
) -> str:
85+
"""Trigger ANALYZE TABLE for tables in an AI Lake database instance.
86+
87+
Args:
88+
instance_id: Database instance name (preferred) or UUID.
89+
table_names: Tables to analyze; if `None` or empty, every table
90+
in the instance is analyzed.
91+
operation_id: Optional client-supplied operation identifier. If
92+
omitted, a fresh UUID is generated. Pass the same value that
93+
`wait_for_operation` will poll on.
94+
95+
Returns:
96+
The operation ID (UUID string) the platform will track this run
97+
under. Pass it to `get_operation` / `wait_for_operation` to poll.
98+
"""
99+
op_id = operation_id or str(uuid.uuid4())
100+
request = AnalyzeStatisticsRequest(table_names=list(table_names) if table_names else [])
101+
# Body return is `Unit`; the platform tracks the operation under the
102+
# client-supplied or server-generated `operation-id` header. We seed
103+
# it ourselves so the caller gets a known polling handle without
104+
# having to read response headers.
105+
self._ai_lake_api.analyze_statistics(instance_id, request, operation_id=op_id)
106+
return op_id
107+
108+
def get_operation(self, operation_id: str) -> CatalogAILakeOperation:
109+
"""Fetch the current state of a long-running AI Lake operation."""
110+
response = self._ai_lake_api.get_ai_lake_operation(operation_id)
111+
# The api-client returns the oneOf'd operation as a dict-like object
112+
# whose concrete subtype depends on `status`. Normalize via to_dict.
113+
data = response.to_dict() if hasattr(response, "to_dict") else dict(response)
114+
return CatalogAILakeOperation(
115+
id=data["id"],
116+
kind=data["kind"],
117+
status=data["status"],
118+
result=data.get("result"),
119+
error=data.get("error"),
120+
)
121+
122+
def wait_for_operation(
123+
self,
124+
operation_id: str,
125+
timeout_s: float = 300.0,
126+
poll_s: float = 2.0,
127+
) -> CatalogAILakeOperation:
128+
"""Block until an AI Lake operation reaches a terminal status.
129+
130+
Raises `CatalogAILakeOperationError` on `failed` and `TimeoutError`
131+
if the operation does not reach a terminal state in time.
132+
"""
133+
deadline = time.monotonic() + timeout_s
134+
while True:
135+
op = self.get_operation(operation_id)
136+
if op.is_succeeded:
137+
return op
138+
if op.is_failed:
139+
raise CatalogAILakeOperationError(op)
140+
if time.monotonic() >= deadline:
141+
raise TimeoutError(
142+
f"AI Lake operation {operation_id} did not finish within {timeout_s}s (last status: {op.status})"
143+
)
144+
time.sleep(poll_s)

packages/gooddata-sdk/src/gooddata_sdk/client.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ def __init__(
7171
self._actions_api = apis.ActionsApi(self._api_client)
7272
self._user_management_api = apis.UserManagementApi(self._api_client)
7373
self._appearance_api = apis.AppearanceApi(self._api_client)
74+
self._ai_lake_api = apis.AILakeApi(self._api_client)
7475
self._executions_cancellable = executions_cancellable
7576

7677
def _do_post_request(
@@ -158,6 +159,10 @@ def user_management_api(self) -> apis.UserManagementApi:
158159
def appearance_api(self) -> apis.AppearanceApi:
159160
return self._appearance_api
160161

162+
@property
163+
def ai_lake_api(self) -> apis.AILakeApi:
164+
return self._ai_lake_api
165+
161166
@property
162167
def executions_cancellable(self) -> bool:
163168
return self._executions_cancellable

packages/gooddata-sdk/src/gooddata_sdk/sdk.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from pathlib import Path
55

6+
from gooddata_sdk.catalog.ai_lake.service import CatalogAILakeService
67
from gooddata_sdk.catalog.appearance.service import CatalogAppearanceService
78
from gooddata_sdk.catalog.data_source.service import CatalogDataSourceService
89
from gooddata_sdk.catalog.export.service import ExportService
@@ -88,6 +89,7 @@ def __init__(self, client: GoodDataApiClient) -> None:
8889
self._tables = TableService(self._client)
8990
self._support = SupportService(self._client)
9091
self._catalog_permission = CatalogPermissionService(self._client)
92+
self._catalog_ai_lake = CatalogAILakeService(self._client)
9193
self._export = ExportService(self._client)
9294

9395
@property
@@ -138,6 +140,10 @@ def catalog_permission(self) -> CatalogPermissionService:
138140
def export(self) -> ExportService:
139141
return self._export
140142

143+
@property
144+
def catalog_ai_lake(self) -> CatalogAILakeService:
145+
return self._catalog_ai_lake
146+
141147
@property
142148
def client(self) -> GoodDataApiClient:
143149
return self._client
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
# (C) 2026 GoodData Corporation
2+
"""Unit tests for `CatalogAILakeService`.
3+
4+
These tests use plain mocks against `AILakeApi` rather than vcr cassettes
5+
because the service is a thin wrapper whose interesting logic
6+
(`wait_for_operation` polling, status discrimination, UUID-seeding) is
7+
deterministic and worth verifying without a live stack.
8+
"""
9+
10+
from __future__ import annotations
11+
12+
from types import SimpleNamespace
13+
from unittest.mock import MagicMock, call, patch
14+
15+
import pytest
16+
from gooddata_sdk.catalog.ai_lake.service import (
17+
CatalogAILakeOperation,
18+
CatalogAILakeOperationError,
19+
CatalogAILakeService,
20+
)
21+
22+
23+
def _make_service() -> tuple[CatalogAILakeService, MagicMock]:
24+
"""Build a service whose api-client side is fully mocked."""
25+
fake_ai_lake_api = MagicMock(name="AILakeApi")
26+
fake_client = SimpleNamespace(_api_client=MagicMock(name="ApiClient"))
27+
28+
with patch("gooddata_sdk.catalog.ai_lake.service.AILakeApi", return_value=fake_ai_lake_api):
29+
service = CatalogAILakeService(fake_client) # type: ignore[arg-type]
30+
31+
assert service._ai_lake_api is fake_ai_lake_api
32+
return service, fake_ai_lake_api
33+
34+
35+
def _operation_response(**fields: object) -> MagicMock:
36+
"""Mimic the api-client's deserialized response (`.to_dict()` returns the raw dict)."""
37+
response = MagicMock()
38+
response.to_dict.return_value = fields
39+
return response
40+
41+
42+
class TestAnalyzeStatistics:
43+
def test_seeds_caller_supplied_operation_id(self) -> None:
44+
service, api = _make_service()
45+
op_id = service.analyze_statistics(
46+
instance_id="demo-db",
47+
table_names=["fact_orders", "dim_country"],
48+
operation_id="11111111-2222-3333-4444-555555555555",
49+
)
50+
assert op_id == "11111111-2222-3333-4444-555555555555"
51+
assert api.analyze_statistics.call_count == 1
52+
call_args = api.analyze_statistics.call_args
53+
assert call_args.args[0] == "demo-db"
54+
assert call_args.kwargs["operation_id"] == "11111111-2222-3333-4444-555555555555"
55+
# Body request carries the table names.
56+
body = call_args.args[1]
57+
assert list(body.table_names) == ["fact_orders", "dim_country"]
58+
59+
def test_generates_uuid_when_not_supplied(self) -> None:
60+
service, api = _make_service()
61+
op_id = service.analyze_statistics(instance_id="demo-db")
62+
# UUID4 string format: 8-4-4-4-12 hex chars.
63+
assert len(op_id) == 36 and op_id.count("-") == 4
64+
assert api.analyze_statistics.call_args.kwargs["operation_id"] == op_id
65+
66+
def test_empty_table_names_is_normalized_to_empty_list(self) -> None:
67+
service, api = _make_service()
68+
service.analyze_statistics(instance_id="demo-db", table_names=None)
69+
assert list(api.analyze_statistics.call_args.args[1].table_names) == []
70+
71+
72+
class TestGetOperation:
73+
def test_normalizes_to_typed_handle(self) -> None:
74+
service, api = _make_service()
75+
api.get_ai_lake_operation.return_value = _operation_response(
76+
id="op-1",
77+
kind="analyze-statistics",
78+
status="succeeded",
79+
result={"tablesAnalyzed": 7},
80+
)
81+
op = service.get_operation("op-1")
82+
assert isinstance(op, CatalogAILakeOperation)
83+
assert op.id == "op-1"
84+
assert op.kind == "analyze-statistics"
85+
assert op.is_succeeded
86+
assert op.result == {"tablesAnalyzed": 7}
87+
assert op.error is None
88+
89+
def test_carries_error_payload_on_failed(self) -> None:
90+
service, api = _make_service()
91+
api.get_ai_lake_operation.return_value = _operation_response(
92+
id="op-2",
93+
kind="analyze-statistics",
94+
status="failed",
95+
error={"code": "ANALYZE_FAILED", "message": "table not found"},
96+
)
97+
op = service.get_operation("op-2")
98+
assert op.is_failed
99+
assert op.error == {"code": "ANALYZE_FAILED", "message": "table not found"}
100+
101+
102+
class TestWaitForOperation:
103+
def test_polls_until_succeeded(self) -> None:
104+
service, api = _make_service()
105+
api.get_ai_lake_operation.side_effect = [
106+
_operation_response(id="op", kind="analyze-statistics", status="pending"),
107+
_operation_response(id="op", kind="analyze-statistics", status="pending"),
108+
_operation_response(id="op", kind="analyze-statistics", status="succeeded", result={}),
109+
]
110+
with patch("gooddata_sdk.catalog.ai_lake.service.time.sleep") as fake_sleep:
111+
op = service.wait_for_operation("op", poll_s=0.5)
112+
assert op.is_succeeded
113+
assert api.get_ai_lake_operation.call_count == 3
114+
# Slept twice between three polls.
115+
assert fake_sleep.call_args_list == [call(0.5), call(0.5)]
116+
117+
def test_raises_on_failed_terminal_status(self) -> None:
118+
service, api = _make_service()
119+
api.get_ai_lake_operation.return_value = _operation_response(
120+
id="op", kind="analyze-statistics", status="failed", error={"message": "boom"}
121+
)
122+
with (
123+
patch("gooddata_sdk.catalog.ai_lake.service.time.sleep"),
124+
pytest.raises(CatalogAILakeOperationError) as exc_info,
125+
):
126+
service.wait_for_operation("op")
127+
assert exc_info.value.operation.is_failed
128+
assert "boom" in str(exc_info.value)
129+
130+
def test_times_out_when_never_terminal(self) -> None:
131+
service, api = _make_service()
132+
api.get_ai_lake_operation.return_value = _operation_response(
133+
id="op", kind="analyze-statistics", status="pending"
134+
)
135+
# Make `time.monotonic` advance past the deadline on the second call so
136+
# the loop runs a few iterations before raising.
137+
with (
138+
patch("gooddata_sdk.catalog.ai_lake.service.time.sleep"),
139+
patch(
140+
"gooddata_sdk.catalog.ai_lake.service.time.monotonic",
141+
side_effect=[0.0, 1.0, 5.0, 11.0],
142+
),
143+
pytest.raises(TimeoutError) as exc_info,
144+
):
145+
service.wait_for_operation("op", timeout_s=10.0, poll_s=0.1)
146+
assert "did not finish within 10.0s" in str(exc_info.value)

0 commit comments

Comments
 (0)