File tree Expand file tree Collapse file tree 1 file changed +23
-1
lines changed Expand file tree Collapse file tree 1 file changed +23
-1
lines changed Original file line number Diff line number Diff line change 1
1
import unittest
2
2
3
- from reactivex import operators as ops
3
+ from reactivex import operators as ops , create as rx_create
4
4
from reactivex .testing import ReactiveTest , TestScheduler
5
5
6
6
on_next = ReactiveTest .on_next
@@ -71,6 +71,28 @@ def create():
71
71
assert results .messages == []
72
72
assert xs .subscriptions == [subscribe (200 , 1000 )]
73
73
74
+ def test_subscribe_on_scheduler_forwarding (self ):
75
+ scheduler = TestScheduler ()
76
+ forwarded_sheduler = None
77
+
78
+ def source ():
79
+ def subscribe (observer , _scheduler ):
80
+ nonlocal forwarded_sheduler
81
+ forwarded_sheduler = _scheduler
82
+
83
+ def action_on_completed (_ , __ ):
84
+ observer .on_completed ()
85
+
86
+ return _scheduler .schedule_absolute (250 , action_on_completed )
87
+
88
+ return rx_create (subscribe )
89
+
90
+ def create ():
91
+ return source ().pipe (ops .subscribe_on (scheduler ))
92
+
93
+ results = scheduler .start (create )
94
+ assert forwarded_sheduler is scheduler
95
+ assert results .messages == [on_completed (250 )]
74
96
75
97
if __name__ == "__main__" :
76
98
unittest .main ()
You can’t perform that action at this time.
0 commit comments