Skip to content

Commit f8a5d8c

Browse files
lupkoRoman Hartig
authored andcommitted
XSH-733: Add factory to create data frames from raw exec definition
* the resulting data frames respect the dimensionality specified in the exec definition * (grand/sub)totals included
1 parent b6d9902 commit f8a5d8c

3 files changed

Lines changed: 558 additions & 1 deletion

File tree

gooddata-pandas/gooddata_pandas/dataframe.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import pandas
77

88
from gooddata_pandas.data_access import compute_and_extract
9+
from gooddata_pandas.result_convertor import convert_result_to_dataframe
910
from gooddata_pandas.utils import (
1011
ColumnsDef,
1112
DefaultInsightColumnNaming,
@@ -14,7 +15,7 @@
1415
_to_item,
1516
make_pandas_index,
1617
)
17-
from gooddata_sdk import Attribute, Filter, GoodDataSdk
18+
from gooddata_sdk import Attribute, ExecutionDefinition, Filter, GoodDataSdk
1819

1920

2021
class DataFrameFactory:
@@ -225,3 +226,18 @@ def for_insight(self, insight_id: str, auto_index: bool = True) -> pandas.DataFr
225226
}
226227

227228
return self.for_items(columns, filter_by=filter_by, auto_index=auto_index)
229+
230+
def for_exec_def(self, exec_def: ExecutionDefinition) -> pandas.DataFrame:
231+
"""
232+
Creates a data frame using an execution definition. The data frame will respect the dimensionality
233+
specified in execution definition's result spec.
234+
235+
Each dimension may be sliced by multiple labels. The factory will create MultiIndex for the dataframe's
236+
row index and the columns.
237+
238+
:param exec_def: execution definition
239+
:return: a new dataframe
240+
"""
241+
response = self._sdk.compute.for_exec_def(workspace_id=self._workspace_id, exec_def=exec_def)
242+
243+
return convert_result_to_dataframe(response=response)
Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
# (C) 2022 GoodData Corporation
2+
from dataclasses import dataclass, field
3+
from typing import Any, Callable, List, Optional, Tuple, Union
4+
5+
import pandas
6+
7+
from gooddata_afm_client import models
8+
from gooddata_sdk import ExecutionResponse, ExecutionResult
9+
10+
_DEFAULT_PAGE_SIZE = 100
11+
_DataHeaders = List[List[Any]]
12+
_DataArray = List[Union[int, None]]
13+
14+
15+
@dataclass(frozen=True)
16+
class _DataWithHeaders:
17+
data: Union[_DataArray, List[_DataArray]]
18+
"""extracted data; either array of values for one-dimensional result or array of arrays of values"""
19+
20+
data_headers: Tuple[_DataHeaders, Optional[_DataHeaders]]
21+
"""per-dimension headers for the data"""
22+
23+
grand_totals: Tuple[Optional[List[_DataArray]], Optional[List[_DataArray]]]
24+
"""per-dimension grand total data"""
25+
26+
grand_total_headers: Tuple[Optional[_DataHeaders], Optional[_DataHeaders]]
27+
"""per-dimension grand total headers"""
28+
29+
30+
@dataclass
31+
class _AccumulatedData:
32+
"""
33+
Utility class to offload code from the function that extracts all data and headers for a
34+
particular paged result. The method drives the paging and calls out to this class to accumulate
35+
the essential data and headers from the page.
36+
"""
37+
38+
data: Any = field(init=False)
39+
data_headers: List[Optional[_DataHeaders]] = field(init=False)
40+
grand_totals: List[Optional[List[_DataArray]]] = field(init=False)
41+
grand_totals_headers: List[Optional[_DataHeaders]] = field(init=False)
42+
43+
def __post_init__(self):
44+
self.data = []
45+
self.data_headers = [None, None]
46+
self.grand_totals = [None, None]
47+
self.grand_totals_headers = [None, None]
48+
49+
def accumulate_data(self, from_result: ExecutionResult) -> None:
50+
"""
51+
if one-dimensional result, the data is single array, so this adds the elements of that array into 'data'
52+
if two-dimensional, the data is array of arrays, so this adds as many arrays as there are table rows
53+
"""
54+
self.data.extend(from_result.data)
55+
56+
def extend_existing_row_data(self, from_result: ExecutionResult) -> None:
57+
offset = from_result.paging_offset[0]
58+
59+
for i in range(len(from_result.data)):
60+
self.data[offset + i].extend(from_result.data[i])
61+
62+
def accumulate_headers(self, from_result: ExecutionResult, from_dim: int) -> None:
63+
"""
64+
Accumulates headers for particular dimension of a result into the provided `data_headers` array at the index
65+
matching the dimension index. This will mutate the `data_headers`
66+
"""
67+
if self.data_headers[from_dim] is None:
68+
self.data_headers[from_dim] = from_result.get_all_headers(dim=from_dim)
69+
else:
70+
for idx, headers in enumerate(from_result.get_all_headers(dim=from_dim)):
71+
self.data_headers[from_dim][idx].extend(headers)
72+
73+
@staticmethod
74+
def _extract_dim_idx(grand_total: models.ExecutionResultGrandTotal) -> int:
75+
# TODO: this is one super-nasty hack; there are two things:
76+
# - grand totals list contains grand totals per-dimension but in in arbitrary order & the cardinality
77+
# of the list does not match the number of dimensions of the result
78+
# - for grand-totals in some dimension, the totalDimensions property mentions the dimension's
79+
# localIdentifier -> fine, except that localIdentifier is _nowhere_ else in the exec response or
80+
# in the exec result;
81+
# - there is also this thing with the totalDimensions being an array, don't know why; guessing its relevant
82+
# for grand totals??
83+
# so for now doing with this nasty thing of relying on convention used in both UI and python SDK where
84+
# the dimension local identifier always specified index of the dimension at the end :)
85+
#
86+
# imho the proper way to deal with this is to include dimension identifier at least in the execution response,
87+
# in the dimension descriptor - same as labels and metrics have their local id there; that way a proper
88+
# lookup can be done and identify dimension index
89+
dims = grand_total["totalDimensions"]
90+
assert len(dims) == 1
91+
92+
return int(dims[0][-1])
93+
94+
def accumulate_grand_totals(self, from_result: ExecutionResult, paging_dim: int) -> None:
95+
"""
96+
accumulates grand totals from the results; processes all grand totals on all dimensions; the method
97+
needs to know in which direction is the paging happening so that it can append new grand total data.
98+
"""
99+
grand_totals = from_result.grand_totals
100+
if not len(grand_totals):
101+
return
102+
103+
for grand_total in grand_totals:
104+
dim_idx = self._extract_dim_idx(grand_total)
105+
# the dimension id specified on the grand total says from what dimension were
106+
# the grand totals calculated (1 for column totals or 0 for row totals);
107+
#
108+
# the grand totals themselves should, however be placed in the opposite dimension:
109+
#
110+
# column totals are extra rows at the end of the data
111+
# row totals are extra columns at the right 'edge' of the data
112+
opposite_dim = 1 if dim_idx == 0 else 0
113+
114+
if self.grand_totals[opposite_dim] is None:
115+
# grand totals not initialized yet; initialize both data and headers by making
116+
# a shallow copy from the results
117+
self.grand_totals[opposite_dim] = grand_total["data"][:]
118+
# TODO: wtf is the deal with this? why can there be multiple elements in the headerGroups list?
119+
self.grand_totals_headers[opposite_dim] = grand_total["dimensionHeaders"][0]["headerGroups"][0][
120+
"headers"
121+
][:]
122+
elif paging_dim != opposite_dim:
123+
# grand totals are already initialized and the code is paging in the direction that reveals
124+
# additional grand total values; append them accordingly; no need to consider total headers:
125+
# that is because only the grand total data is subject to paging
126+
if opposite_dim == 0:
127+
# have column totals and paging 'to the right'; totals for the new columns are revealed so
128+
# extend existing data arrays
129+
for total_idx, total_data in enumerate(grand_total["data"]):
130+
self.grand_totals[opposite_dim][total_idx].extend(total_data)
131+
else:
132+
# have row totals and paging down, keep adding extra rows
133+
self.grand_totals[opposite_dim].extend(grand_total["data"])
134+
135+
def result(self) -> _DataWithHeaders:
136+
return _DataWithHeaders(
137+
data=self.data,
138+
data_headers=(self.data_headers[0], self.data_headers[1]),
139+
grand_totals=(self.grand_totals[0], self.grand_totals[1]),
140+
grand_total_headers=(self.grand_totals_headers[0], self.grand_totals_headers[1]),
141+
)
142+
143+
144+
def _extract_all_result_data(response: ExecutionResponse, page_size: int = _DEFAULT_PAGE_SIZE) -> _DataWithHeaders:
145+
"""
146+
Extracts all data and headers for an execution result. This does page around the execution result to extract
147+
everything from the paged API.
148+
149+
:param response: execution response to work with;
150+
:return:
151+
"""
152+
num_dims = len(response.dimensions)
153+
offset = [0 for _ in range(num_dims)]
154+
limit = [page_size for _ in range(num_dims)]
155+
acc = _AccumulatedData()
156+
157+
while True:
158+
# top-level loop pages through the first dimension;
159+
#
160+
# if one-dimensional result, it pages over an array of data
161+
# if two-dimensional result, it pages over table rows
162+
result = response.read_result(offset=offset, limit=limit)
163+
164+
acc.accumulate_data(from_result=result)
165+
acc.accumulate_headers(from_result=result, from_dim=0)
166+
acc.accumulate_grand_totals(from_result=result, paging_dim=0)
167+
168+
if num_dims > 1:
169+
# when result is two-dimensional make sure to read the column headers and column totals
170+
# just once - when scrolling 'to the right' for the first time;
171+
load_headers_and_totals = False
172+
if acc.data_headers[1] is None:
173+
acc.accumulate_headers(from_result=result, from_dim=1)
174+
load_headers_and_totals = True
175+
176+
if not result.is_complete(dim=1):
177+
# have two-dimensional result (typical table) and the page does not contain
178+
# all the columns.
179+
#
180+
# page 'to the right' to get data from all columns, extend existing rows with that data
181+
offset = [offset[0], result.next_page_start(dim=1)]
182+
while True:
183+
result = response.read_result(offset=offset, limit=limit)
184+
acc.extend_existing_row_data(from_result=result)
185+
186+
if load_headers_and_totals:
187+
acc.accumulate_headers(from_result=result, from_dim=1)
188+
acc.accumulate_grand_totals(from_result=result, paging_dim=1)
189+
190+
if result.is_complete(dim=1):
191+
break
192+
193+
offset = [offset[0], result.next_page_start(dim=1)]
194+
195+
if result.is_complete(dim=0):
196+
break
197+
198+
offset = [result.next_page_start(dim=0), 0] if num_dims > 1 else [result.next_page_start(dim=0)]
199+
200+
return acc.result()
201+
202+
203+
def _create_header_mapper(response: ExecutionResponse, dim: int) -> Callable[[int, Any], str]:
204+
dim_descriptor = response.dimensions[dim]
205+
206+
def _mapper(header_idx: int, header: Any) -> str:
207+
if header is None:
208+
return ""
209+
elif "attributeHeader" in header:
210+
return header["attributeHeader"]["labelValue"]
211+
elif "measureHeader" in header:
212+
measure_idx = header["measureHeader"]["measureIndex"]
213+
measure_descriptor = dim_descriptor["headers"][header_idx]["measureGroupHeaders"][measure_idx]
214+
215+
if "name" in measure_descriptor:
216+
return measure_descriptor["name"]
217+
218+
return measure_descriptor["localIdentifier"]
219+
elif "totalHeader" in header:
220+
return header["totalHeader"]["function"]
221+
222+
return _mapper
223+
224+
225+
def _headers_to_index(
226+
dim_idx: int, headers: Tuple[_DataHeaders, Optional[_DataHeaders]], response: ExecutionResponse
227+
) -> Optional[pandas.Index]:
228+
if len(response.dimensions) <= dim_idx or not len(response.dimensions[dim_idx]["headers"]):
229+
return None
230+
231+
mapper = _create_header_mapper(response, dim=dim_idx)
232+
233+
return pandas.MultiIndex.from_arrays(
234+
[
235+
tuple(mapper(header_idx, header) for header in header_group)
236+
for header_idx, header_group in enumerate(headers[dim_idx])
237+
]
238+
)
239+
240+
241+
def _merge_grand_totals_into_data(extract: _DataWithHeaders) -> Union[_DataArray, List[_DataArray]]:
242+
"""
243+
Merges grand totals into the extracted data. this function will mutate the extracted data, extending
244+
the rows and columns with grand totals. Going with mutation here so as not to copy arrays around
245+
"""
246+
data: Any = extract.data
247+
248+
if extract.grand_totals[0] is not None:
249+
# column totals are computed into extra rows, one row per column total
250+
# add those rows at the end of the data rows
251+
data.extend(extract.grand_totals[0])
252+
253+
if extract.grand_totals[1] is not None:
254+
# row totals are computed into extra columns that should be appended to
255+
# existing data rows
256+
for row_idx, cols_to_append in enumerate(extract.grand_totals[1]):
257+
data[row_idx].extend(cols_to_append)
258+
259+
return data
260+
261+
262+
def _merge_grand_total_headers_into_headers(extract: _DataWithHeaders) -> Tuple[_DataHeaders, Optional[_DataHeaders]]:
263+
"""
264+
Merges grand total headers into data headers. This function will mutate the extracted data.
265+
"""
266+
headers = extract.data_headers
267+
268+
for dim_idx, grand_total_headers in enumerate(extract.grand_total_headers):
269+
if grand_total_headers is None:
270+
continue
271+
272+
headers[dim_idx][0].extend(grand_total_headers)
273+
padding = [None] * len(grand_total_headers)
274+
for other_headers in headers[dim_idx][1:]:
275+
other_headers.extend(padding)
276+
277+
return headers
278+
279+
280+
def convert_result_to_dataframe(response: ExecutionResponse) -> pandas.DataFrame:
281+
"""
282+
Converts execution result to a pandas dataframe, maintaining the dimensionality of the result.
283+
284+
Because the result itself does not contain all the necessary metadata to do the full conversion, this method
285+
expects that the execution _response_.
286+
287+
:param response: execution response through which the result can be read and converted to a dataframe
288+
:return: a new dataframe
289+
"""
290+
extract = _extract_all_result_data(response)
291+
full_data = _merge_grand_totals_into_data(extract)
292+
full_headers = _merge_grand_total_headers_into_headers(extract)
293+
294+
return pandas.DataFrame(
295+
data=full_data,
296+
index=_headers_to_index(dim_idx=0, headers=full_headers, response=response),
297+
columns=_headers_to_index(dim_idx=1, headers=full_headers, response=response),
298+
)

0 commit comments

Comments
 (0)