Skip to content

Question: How can I test my scheduler correctly? #129

@azmovi

Description

@azmovi

I created this MRE so that someone can help me correct this test; I tried but couldn't understand it from the documentation.

  • pyproject.toml
[project]
name = "test"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.14"
dependencies = [
    "faststream[rabbit,rabbitmq]>=0.6.5",
    "taskiq-faststream[otel,rabbit]>=0.4.0",
    "ipdb>=0.13.13",
    "pytest>=9.0.2",
    "pytest-asyncio>=1.3.0",
    "freezegun>=1.5.5",
]
  • Scheduler + test
from datetime import UTC, datetime, timedelta
import pytest
from faststream.rabbit import RabbitBroker, TestRabbitBroker
from freezegun import freeze_time
from taskiq_faststream.types import ScheduledTask
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_faststream import BrokerWrapper, StreamScheduler


broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
taskiq_broker = BrokerWrapper(broker)


@broker.subscriber('test')
async def return_hello():
    return 'hello'

midnight_schedule = [ScheduledTask(cron='0 0 * * *')]
scheduled_task = taskiq_broker.task(queue='test', schedule=midnight_schedule)

scheduler = StreamScheduler(
    broker=taskiq_broker,
    sources=[LabelScheduleSource(taskiq_broker)],
)

def get_today_midnight() -> datetime:
    today = datetime.now(tz=UTC)
    return today.replace(hour=0, minute=0, second=0, microsecond=0)



@pytest.mark.asyncio
async def test_task():
    async with TestRabbitBroker(broker) as br:
        midnight = get_today_midnight()
        with freeze_time(midnight) as frozen_time:
            await scheduler.startup()
            next_midnight = midnight + timedelta(hours=1)
            frozen_time.move_to(next_midnight)
            return_hello.mock.assert_called()
  • Pytest output: E AssertionError: Expected 'mock' to have been called.

  • One issue I tried was replacing the broker with taskiq_broker, but it still didn't work and shows another error.:
    Pyest output: E AttributeError: <taskiq_faststream.broker.BrokerWrapper object at 0x7c554cc217f0> does not have the attribute '_channel'

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions