Skip to content

Commit 0efa842

Browse files
author
Kirk Shoop
committed
add run-loop scheduler
1 parent e477c5a commit 0efa842

File tree

3 files changed

+209
-2
lines changed

3 files changed

+209
-2
lines changed

Rx/v2/src/rxcpp/operators/rx-repeat.hpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,16 @@ struct repeat : public operator_base<T>
5353
}
5454
composite_subscription source_lifetime;
5555
output_type out;
56+
composite_subscription::weak_subscription lifetime_token;
5657

5758
void do_subscribe() {
5859
auto state = this->shared_from_this();
60+
61+
state->out.remove(state->lifetime_token);
62+
state->source_lifetime.unsubscribe();
5963

6064
state->source_lifetime = composite_subscription();
61-
state->out.add(state->source_lifetime);
65+
state->lifetime_token = state->out.add(state->source_lifetime);
6266

6367
state->source.subscribe(
6468
state->out,
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2+
3+
#pragma once
4+
5+
#if !defined(RXCPP_RX_SCHEDULER_RUN_LOOP_HPP)
6+
#define RXCPP_RX_SCHEDULER_RUN_LOOP_HPP
7+
8+
#include "../rx-includes.hpp"
9+
10+
namespace rxcpp {
11+
12+
namespace schedulers {
13+
14+
namespace detail {
15+
16+
struct run_loop_state : public std::enable_shared_from_this<run_loop_state>
17+
{
18+
typedef scheduler::clock_type clock_type;
19+
20+
typedef detail::schedulable_queue<
21+
clock_type::time_point> queue_item_time;
22+
23+
typedef queue_item_time::item_type item_type;
24+
typedef queue_item_time::const_reference const_reference_item_type;
25+
26+
virtual ~run_loop_state()
27+
{
28+
}
29+
30+
run_loop_state()
31+
{
32+
}
33+
34+
composite_subscription lifetime;
35+
mutable std::mutex lock;
36+
mutable queue_item_time q;
37+
recursion r;
38+
};
39+
40+
}
41+
42+
43+
struct run_loop_scheduler : public scheduler_interface
44+
{
45+
private:
46+
typedef run_loop_scheduler this_type;
47+
run_loop_scheduler(const this_type&);
48+
49+
struct run_loop_worker : public worker_interface
50+
{
51+
private:
52+
typedef run_loop_worker this_type;
53+
54+
run_loop_worker(const this_type&);
55+
56+
public:
57+
std::weak_ptr<detail::run_loop_state> state;
58+
59+
virtual ~run_loop_worker()
60+
{
61+
}
62+
63+
explicit run_loop_worker(std::weak_ptr<detail::run_loop_state> ws)
64+
: state(ws)
65+
{
66+
}
67+
68+
virtual clock_type::time_point now() const {
69+
return clock_type::now();
70+
}
71+
72+
virtual void schedule(const schedulable& scbl) const {
73+
schedule(now(), scbl);
74+
}
75+
76+
virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
77+
if (scbl.is_subscribed()) {
78+
auto st = state.lock();
79+
std::unique_lock<std::mutex> guard(st->lock);
80+
st->q.push(detail::run_loop_state::item_type(when, scbl));
81+
st->r.reset(false);
82+
}
83+
}
84+
};
85+
86+
std::weak_ptr<detail::run_loop_state> state;
87+
88+
public:
89+
explicit run_loop_scheduler(std::weak_ptr<detail::run_loop_state> ws)
90+
: state(ws)
91+
{
92+
}
93+
virtual ~run_loop_scheduler()
94+
{
95+
}
96+
97+
virtual clock_type::time_point now() const {
98+
return clock_type::now();
99+
}
100+
101+
virtual worker create_worker(composite_subscription cs) const {
102+
auto lifetime = state.lock()->lifetime;
103+
auto token = lifetime.add(cs);
104+
cs.add([=](){lifetime.remove(token);});
105+
return worker(cs, create_worker_interface());
106+
}
107+
108+
std::shared_ptr<worker_interface> create_worker_interface() const {
109+
return std::make_shared<run_loop_worker>(state);
110+
}
111+
};
112+
113+
class run_loop
114+
{
115+
private:
116+
typedef run_loop this_type;
117+
// don't allow this instance to copy/move since it owns current_thread queue
118+
// for the thread it is constructed on.
119+
run_loop(const this_type&);
120+
run_loop(this_type&&);
121+
122+
typedef scheduler::clock_type clock_type;
123+
typedef detail::action_queue queue_type;
124+
125+
typedef detail::run_loop_state::item_type item_type;
126+
typedef detail::run_loop_state::const_reference_item_type const_reference_item_type;
127+
128+
std::shared_ptr<detail::run_loop_state> state;
129+
std::shared_ptr<run_loop_scheduler> sc;
130+
131+
public:
132+
run_loop()
133+
: state(std::make_shared<detail::run_loop_state>())
134+
, sc(std::make_shared<run_loop_scheduler>(state))
135+
{
136+
// take ownership so that the current_thread scheduler
137+
// uses the same queue on this thread
138+
queue_type::ensure(sc->create_worker_interface());
139+
}
140+
~run_loop()
141+
{
142+
state->lifetime.unsubscribe();
143+
144+
std::unique_lock<std::mutex> guard(state->lock);
145+
146+
// release ownership
147+
queue_type::destroy();
148+
149+
auto expired = std::move(state->q);
150+
if (!state->q.empty()) abort();
151+
}
152+
153+
clock_type::time_point now() const {
154+
return clock_type::now();
155+
}
156+
157+
composite_subscription get_subscription() const {
158+
return state->lifetime;
159+
}
160+
161+
bool empty() const {
162+
return state->q.empty();
163+
}
164+
165+
const_reference_item_type peek() const {
166+
return state->q.top();
167+
}
168+
169+
void dispatch() const {
170+
std::unique_lock<std::mutex> guard(state->lock);
171+
if (state->q.empty()) {
172+
return;
173+
}
174+
auto& peek = state->q.top();
175+
if (!peek.what.is_subscribed()) {
176+
state->q.pop();
177+
return;
178+
}
179+
if (clock_type::now() < peek.when) {
180+
return;
181+
}
182+
auto what = peek.what;
183+
state->q.pop();
184+
state->r.reset(state->q.empty());
185+
guard.unlock();
186+
what(state->r.get_recurse());
187+
}
188+
189+
scheduler get_scheduler() const {
190+
return make_scheduler(sc);
191+
}
192+
};
193+
194+
inline scheduler make_run_loop(const run_loop& r) {
195+
return r.get_scheduler();
196+
}
197+
198+
}
199+
200+
}
201+
202+
#endif

projects/CMake/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ set(RX_SOURCES
8585
${RXCPP_DIR}/Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp
8686
${RXCPP_DIR}/Rx/v2/src/rxcpp/schedulers/rx-immediate.hpp
8787
${RXCPP_DIR}/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp
88+
${RXCPP_DIR}/Rx/v2/src/rxcpp/schedulers/rx-runloop.hpp
8889
${RXCPP_DIR}/Rx/v2/src/rxcpp/schedulers/rx-sameworker.hpp
8990
${RXCPP_DIR}/Rx/v2/src/rxcpp/schedulers/rx-test.hpp
9091
${RXCPP_DIR}/Rx/v2/src/rxcpp/schedulers/rx-virtualtime.hpp
@@ -107,4 +108,4 @@ set(RX_SOURCES
107108
source_group("src" FILES ${RX_SOURCES})
108109

109110
add_library(RxCpp SHARED ${RX_SOURCES})
110-
SET_TARGET_PROPERTIES(RxCpp PROPERTIES LINKER_LANGUAGE CXX)
111+
SET_TARGET_PROPERTIES(RxCpp PROPERTIES LINKER_LANGUAGE CXX)

0 commit comments

Comments
 (0)