1
1
import asyncio
2
+ import typing
2
3
from datetime import datetime , timedelta , timezone
3
4
from typing import Any
4
5
from unittest .mock import MagicMock
5
6
6
7
import pytest
8
+ from faststream .types import SendableMessage
7
9
from faststream .utils .functions import timeout_scope
8
10
from freezegun import freeze_time
9
- from taskiq import AsyncBroker , TaskiqScheduler
11
+ from taskiq import AsyncBroker
10
12
from taskiq .cli .scheduler .args import SchedulerArgs
11
13
from taskiq .cli .scheduler .run import run_scheduler
12
14
from taskiq .schedule_sources import LabelScheduleSource
13
15
14
16
from taskiq_faststream import BrokerWrapper , StreamScheduler
17
+ from tests import messages
15
18
16
19
17
20
@pytest .mark .anyio
@@ -54,7 +57,7 @@ async def handler(msg: str) -> None:
54
57
task = asyncio .create_task (
55
58
run_scheduler (
56
59
SchedulerArgs (
57
- scheduler = TaskiqScheduler (
60
+ scheduler = StreamScheduler (
58
61
broker = taskiq_broker ,
59
62
sources = [LabelScheduleSource (taskiq_broker )],
60
63
),
@@ -69,24 +72,44 @@ async def handler(msg: str) -> None:
69
72
mock .assert_called_once_with ("Hi!" )
70
73
task .cancel ()
71
74
75
+ @pytest .mark .parametrize (
76
+ "msg" ,
77
+ [
78
+ messages .message , # regular msg
79
+ messages .sync_callable_msg , # sync callable
80
+ messages .async_callable_msg , # async callable
81
+ messages .sync_generator_msg , # sync generator
82
+ messages .async_generator_msg , # async generator
83
+ messages .sync_callable_class_message , # sync callable class
84
+ messages .async_callable_class_message , # async callable class
85
+ ],
86
+ )
72
87
async def test_task_multiple_schedules_by_cron (
73
88
self ,
74
89
subject : str ,
75
90
broker : Any ,
76
91
event : asyncio .Event ,
92
+ msg : typing .Union [
93
+ None ,
94
+ SendableMessage ,
95
+ typing .Callable [[], SendableMessage ],
96
+ typing .Callable [[], typing .Awaitable [SendableMessage ]],
97
+ typing .Callable [[], typing .Generator [SendableMessage , None , None ]],
98
+ typing .Callable [[], typing .AsyncGenerator [SendableMessage , None ]],
99
+ ],
77
100
) -> None :
78
101
"""Test cron runs twice via StreamScheduler."""
79
102
received_message = []
80
103
81
104
@broker .subscriber (subject )
82
- async def handler (msg : str ) -> None :
83
- received_message .append (msg )
105
+ async def handler (message : str ) -> None :
106
+ received_message .append (message )
84
107
event .set ()
85
108
86
109
taskiq_broker = self .build_taskiq_broker (broker )
87
110
88
111
taskiq_broker .task (
89
- "Hi!" ,
112
+ msg ,
90
113
** {self .subj_name : subject },
91
114
schedule = [
92
115
{
@@ -116,4 +139,6 @@ async def handler(msg: str) -> None:
116
139
117
140
task .cancel ()
118
141
119
- assert received_message == ["Hi!" , "Hi!" ], received_message
142
+ assert received_message == [messages .message , messages .message ], (
143
+ received_message
144
+ )
0 commit comments