Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ def date_param():
"dags update --dag-id=example_bash_operator --no-is-paused",
# DAG Run commands
"dagrun list --dag-id example_bash_operator --state success --limit=1",
# XCom commands - need a DAG run with completed tasks
f'xcom add --dag-id=example_bash_operator --dag-run-id="manual__{ONE_DATE_PARAM}" --task-id=runme_0 --key=test_xcom_key --value=\'{{"test": "value"}}\'',
f'xcom get --dag-id=example_bash_operator --dag-run-id="manual__{ONE_DATE_PARAM}" --task-id=runme_0 --key=test_xcom_key',
f'xcom list --dag-id=example_bash_operator --dag-run-id="manual__{ONE_DATE_PARAM}" --task-id=runme_0',
f'xcom edit --dag-id=example_bash_operator --dag-run-id="manual__{ONE_DATE_PARAM}" --task-id=runme_0 --key=test_xcom_key --value=\'{{"updated": "value"}}\'',
f'xcom delete --dag-id=example_bash_operator --dag-run-id="manual__{ONE_DATE_PARAM}" --task-id=runme_0 --key=test_xcom_key',
# Jobs commands
"jobs list",
# Pools commands
Expand Down
2 changes: 1 addition & 1 deletion airflow-ctl/docs/images/command_hashes.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
main:deacf21c6300eae16afbf8cbd538f1ef
main:65249416abad6ad24c276fb44326ae15
assets:b3ae2b933e54528bf486ff28e887804d
auth:f396d4bce90215599dde6ad0a8f30f29
backfill:bbce9859a2d1ce054ad22db92dea8c05
Expand Down
174 changes: 89 additions & 85 deletions airflow-ctl/docs/images/output_main.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 7 additions & 0 deletions airflow-ctl/src/airflowctl/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
ServerResponseError,
VariablesOperations,
VersionOperations,
XComOperations,
)
from airflowctl.exceptions import (
AirflowCtlCredentialNotFoundException,
Expand Down Expand Up @@ -301,6 +302,12 @@ def version(self):
"""Get the version of the server."""
return VersionOperations(self)

@lru_cache() # type: ignore[prop-decorator]
@property
def xcom(self):
"""Operations related to XComs."""
return XComOperations(self)


# API Client Decorator for CLI Actions
@contextlib.contextmanager
Expand Down
127 changes: 127 additions & 0 deletions airflow-ctl/src/airflowctl/api/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from __future__ import annotations

import datetime
import json
from typing import TYPE_CHECKING, Any, TypeVar

import httpx
Expand Down Expand Up @@ -70,6 +71,10 @@
VariableCollectionResponse,
VariableResponse,
VersionInfo,
XComCollectionResponse,
XComCreateBody,
XComResponseNative,
XComUpdateBody,
)
from airflowctl.exceptions import AirflowCtlConnectionException

Expand Down Expand Up @@ -697,3 +702,125 @@ def get(self) -> VersionInfo | ServerResponseError:
return VersionInfo.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e


class XComOperations(BaseOperations):
"""XCom operations."""

def get(
self,
dag_id: str,
dag_run_id: str,
task_id: str,
key: str,
map_index: int = None, # type: ignore
) -> XComResponseNative | ServerResponseError:
"""Get an XCom entry."""
try:
params: dict[str, Any] = {}
if map_index is not None:
params["map_index"] = map_index
self.response = self.client.get(
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{key}",
params=params,
)
return XComResponseNative.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e

def list(
self,
dag_id: str,
dag_run_id: str,
task_id: str,
map_index: int = None, # type: ignore
key: str = None, # type: ignore
) -> XComCollectionResponse | ServerResponseError:
"""List XCom entries."""
params: dict[str, Any] = {}
if map_index is not None:
params["map_index"] = map_index
if key is not None:
params["xcom_key"] = key
return super().execute_list(
path=f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries",
data_model=XComCollectionResponse,
params=params,
)

def add(
self,
dag_id: str,
dag_run_id: str,
task_id: str,
key: str,
value: str,
map_index: int = None, # type: ignore
) -> XComResponseNative | ServerResponseError:
"""Add an XCom entry."""
try:
parsed_value = json.loads(value)
except (ValueError, TypeError):
parsed_value = value

body_dict: dict[str, Any] = {"key": key, "value": parsed_value}
if map_index is not None:
body_dict["map_index"] = map_index
body = XComCreateBody(**body_dict)
try:
self.response = self.client.post(
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries",
json=body.model_dump(mode="json", exclude_unset=True),
)
return XComResponseNative.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e

def edit(
self,
dag_id: str,
dag_run_id: str,
task_id: str,
key: str,
value: str,
map_index: int = None, # type: ignore
) -> XComResponseNative | ServerResponseError:
"""Edit an XCom entry."""
try:
parsed_value = json.loads(value)
except (ValueError, TypeError):
parsed_value = value

body_dict: dict[str, Any] = {"value": parsed_value}
if map_index is not None:
body_dict["map_index"] = map_index
body = XComUpdateBody(**body_dict)
try:
self.response = self.client.patch(
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{key}",
json=body.model_dump(mode="json", exclude_unset=True),
)
return XComResponseNative.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e

def delete(
self,
dag_id: str,
dag_run_id: str,
task_id: str,
key: str,
map_index: int = None, # type: ignore
) -> str | ServerResponseError:
"""Delete an XCom entry."""
try:
params: dict[str, Any] = {}
if map_index is not None:
params["map_index"] = map_index
self.client.delete(
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{key}",
params=params,
)
return key
except ServerResponseError as e:
raise e
Comment thread
dheerajturaga marked this conversation as resolved.
2 changes: 1 addition & 1 deletion airflow-ctl/src/airflowctl/ctl/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ def __init__(self, file_path: str | Path | None = None):
# Exclude parameters that are not needed for CLI from datamodels
self.excluded_parameters = ["schema_"]
# This list is used to determine if the command/operation needs to output data
self.output_command_list = ["list", "get", "create", "delete", "update", "trigger"]
self.output_command_list = ["list", "get", "create", "delete", "update", "trigger", "add", "edit"]
self.exclude_operation_names = ["LoginOperations", "VersionOperations", "BaseOperations"]
self.exclude_method_names = [
"error",
Expand Down
Loading
Loading