feat(scheduler): add optional send_timeout to prevent silent stall on hung kick#627
Open
Rohit-Ekbote wants to merge 1 commit into
Open
feat(scheduler): add optional send_timeout to prevent silent stall on hung kick#627Rohit-Ekbote wants to merge 1 commit into
Rohit-Ekbote wants to merge 1 commit into
Conversation
… hung kick ## Summary Adds an optional `send_timeout` (CLI `--send-timeout`) on `SchedulerLoop.run` that wraps each spawned send task in `asyncio.wait_for`, so a hung `broker.kick` cannot indefinitely block subsequent ticks for the same `schedule_id` via the `running_schedules` skip check. Default is `None` — no timeout, fully backwards-compatible. ## Problem `SchedulerLoop.run` skips ticks for any `schedule_id` whose previous send task is still in `running_schedules`. The entry only leaves `running_schedules` via the task's `add_done_callback`, which fires only when the spawned coroutine completes. If `broker.kick` ever hangs (sentinel failover, half-open socket, network blip without TCP RST), the spawned send task never completes, the done_callback never fires, and **every subsequent tick for that schedule is silently skipped** — no error, no log, no recovery until process restart. We hit this in production on `taskiq==0.11.20` (`ListQueueSentinelBroker` + redis-sentinel running through a flap). After upgrading to validate, the same logic is unchanged at HEAD; the bug is present in `0.12.4`. py-spy on a wedged scheduler confirmed: - the asyncio event loop is alive and iterating (`_run_once` timer advances every minute) - `sched_count: 1` — exactly one timer, the main loop's own sleep - the scheduler simply stops spawning new send tasks for affected `schedule_id`s User-level mitigations — subclassing `TaskiqScheduler` and wrapping `on_ready` in `asyncio.wait_for` — don't help, because the wrapper runs *inside* the send task; if the send task itself never returns, the scheduler's `running_schedules` entry persists regardless. The only place a timeout can break the cycle is around the spawned send task itself, which is owned by the scheduler. Filed as #<ISSUE_NUMBER> with the full investigation. ## Change 1. New free function `send_with_timeout(scheduler, source, task, timeout)` in `taskiq/cli/scheduler/run.py` that wraps `send` in `asyncio.wait_for`. On `TimeoutError`, logs a WARNING and returns cleanly so the done_callback can clean up `running_schedules`. Non-timeout exceptions still propagate so existing observability surfaces (e.g., `SendTaskError` logs) are unchanged. 2. `SchedulerLoop.run` accepts `send_timeout: float | None = None`. When set, the spawned send task uses `send_with_timeout`; otherwise it uses `send` as before. 3. `SchedulerArgs.send_timeout: float | None` and a `--send-timeout` CLI argument plumbed through to `SchedulerLoop.run`. Defaults preserve current behavior. Users opt in by passing `--send-timeout=N` (or setting `send_timeout` programmatically). ## Tests New `tests/cli/scheduler/test_send_with_timeout.py` covering: - Timed-out send returns cleanly (does NOT raise) and logs a WARNING with task_name + schedule_id + timeout - Successful send does NOT log a warning - Non-timeout exceptions propagate - The inner `send` coroutine is actually cancelled on timeout (observed via `CancelledError` in the test scheduler's `on_ready`) - Plain `send` is unchanged in behavior (opt-in only) All 5 new tests pass. All 26 existing `tests/cli/scheduler/` tests pass — no regressions. ## API shape - Function: `taskiq.cli.scheduler.run.send_with_timeout` (free function, mirrors `send`) - `SchedulerLoop.run(*, send_timeout: float | None = None)` (kw-only, defaults to None) - CLI: `--send-timeout FLOAT` (default: no timeout) Happy to iterate on the name / default / placement if maintainers prefer a different shape. Reasonable production default is ~60s; defaulting to `None` keeps the change strictly additive.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds an optional
send_timeout(CLI--send-timeout) onSchedulerLoop.runthat wraps each spawned send task inasyncio.wait_for, so a hungbroker.kickcannot indefinitely block subsequent ticks for the sameschedule_idvia therunning_schedulesskip check.Default is
None— no timeout, fully backwards-compatible.Problem
SchedulerLoop.runskips ticks for anyschedule_idwhose previous send task is still inrunning_schedules. The entry only leavesrunning_schedulesvia the task'sadd_done_callback, which fires only when the spawned coroutine completes. Ifbroker.kickever hangs (sentinel failover, half-open socket, network blip without TCP RST), the spawned send task never completes, the done_callback never fires, and every subsequent tick for that schedule is silently skipped — no error, no log, no recovery until process restart.We hit this in production on
taskiq==0.11.20(ListQueueSentinelBroker+ redis-sentinel running through a flap). After upgrading to validate, the same logic is unchanged at HEAD; the bug is present in0.12.4.py-spy on a wedged scheduler confirmed:
_run_oncetimer advances every minute)sched_count: 1— exactly one timer, the main loop's own sleepschedule_idsUser-level mitigations — subclassing
TaskiqSchedulerand wrappingon_readyinasyncio.wait_for— don't help, because the wrapper runs inside the send task; if the send task itself never returns, the scheduler'srunning_schedulesentry persists regardless. The only place a timeout can break the cycle is around the spawned send task itself, which is owned by the scheduler.Filed as #626 with the full investigation.
Change
send_with_timeout(scheduler, source, task, timeout)intaskiq/cli/scheduler/run.pythat wrapssendinasyncio.wait_for. OnTimeoutError, logs a WARNING and returns cleanly so the done_callback can clean uprunning_schedules. Non-timeout exceptions still propagate so existing observability surfaces (e.g.,SendTaskErrorlogs) are unchanged.SchedulerLoop.runacceptssend_timeout: float | None = None. When set, the spawned send task usessend_with_timeout; otherwise it usessendas before.SchedulerArgs.send_timeout: float | Noneand a--send-timeoutCLI argument plumbed through toSchedulerLoop.run.Defaults preserve current behavior. Users opt in by passing
--send-timeout=N(or settingsend_timeoutprogrammatically).Tests
New
tests/cli/scheduler/test_send_with_timeout.pycovering:sendcoroutine is actually cancelled on timeout (observed viaCancelledErrorin the test scheduler'son_ready)sendis unchanged in behavior (opt-in only)All 5 new tests pass. All 26 existing
tests/cli/scheduler/tests pass — no regressions.API shape
taskiq.cli.scheduler.run.send_with_timeout(free function, mirrorssend)SchedulerLoop.run(*, send_timeout: float | None = None)(kw-only, defaults to None)--send-timeout FLOAT(default: no timeout)Happy to iterate on the name / default / placement if maintainers prefer a different shape. Reasonable production default is ~60s; defaulting to
Nonekeeps the change strictly additive.