Skip to content

Commit e477c5a

Browse files
author
Kirk Shoop
committed
lifetime improvements
1 parent 01a89e0 commit e477c5a

File tree

4 files changed

+91
-44
lines changed

4 files changed

+91
-44
lines changed

Rx/v2/src/rxcpp/operators/rx-observe_on.hpp

Lines changed: 52 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -68,65 +68,79 @@ struct observe_on
6868
, destination(std::move(d))
6969
{
7070
}
71+
72+
void finish(std::unique_lock<std::mutex>& guard, typename mode::type end) const {
73+
if (!guard.owns_lock()) {
74+
abort();
75+
}
76+
if (current == mode::Errored || current == mode::Disposed) {return;}
77+
current = end;
78+
queue_type fill_expired;
79+
swap(fill_expired, fill_queue);
80+
queue_type drain_expired;
81+
swap(drain_expired, drain_queue);
82+
RXCPP_UNWIND_AUTO([&](){guard.lock();});
83+
guard.unlock();
84+
lifetime.unsubscribe();
85+
destination.unsubscribe();
86+
}
7187

7288
void ensure_processing(std::unique_lock<std::mutex>& guard) const {
7389
if (!guard.owns_lock()) {
7490
abort();
7591
}
7692
if (current == mode::Empty) {
7793
current = mode::Processing;
94+
95+
if (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty()) {
96+
finish(guard, mode::Disposed);
97+
}
7898

7999
auto keepAlive = this->shared_from_this();
80-
100+
81101
auto drain = [keepAlive, this](const rxsc::schedulable& self){
82102
using std::swap;
83103
try {
84-
if (drain_queue.empty() || !destination.is_subscribed()) {
85-
std::unique_lock<std::mutex> guard(lock);
86-
if (!destination.is_subscribed() ||
87-
(!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty())) {
88-
current = mode::Disposed;
89-
queue_type expired;
90-
swap(expired, fill_queue);
91-
guard.unlock();
92-
lifetime.unsubscribe();
93-
destination.unsubscribe();
94-
return;
95-
}
96-
if (drain_queue.empty()) {
97-
if (fill_queue.empty()) {
98-
current = mode::Empty;
104+
for (;;) {
105+
if (drain_queue.empty() || !destination.is_subscribed()) {
106+
std::unique_lock<std::mutex> guard(lock);
107+
if (!destination.is_subscribed() ||
108+
(!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty())) {
109+
finish(guard, mode::Disposed);
99110
return;
100111
}
101-
swap(fill_queue, drain_queue);
112+
if (drain_queue.empty()) {
113+
if (fill_queue.empty()) {
114+
current = mode::Empty;
115+
return;
116+
}
117+
swap(fill_queue, drain_queue);
118+
}
102119
}
120+
auto notification = std::move(drain_queue.front());
121+
drain_queue.pop_front();
122+
notification->accept(destination);
123+
std::unique_lock<std::mutex> guard(lock);
124+
self();
125+
if (lifetime.is_subscribed()) break;
103126
}
104-
auto notification = std::move(drain_queue.front());
105-
drain_queue.pop_front();
106-
notification->accept(destination);
107-
self();
108127
} catch(...) {
109128
destination.on_error(std::current_exception());
110129
std::unique_lock<std::mutex> guard(lock);
111-
current = mode::Errored;
112-
queue_type expired;
113-
swap(expired, fill_queue);
130+
finish(guard, mode::Errored);
114131
}
115132
};
116133

117134
auto selectedDrain = on_exception(
118135
[&](){return coordinator.act(drain);},
119136
destination);
120137
if (selectedDrain.empty()) {
121-
current = mode::Errored;
122-
using std::swap;
123-
queue_type expired;
124-
swap(expired, fill_queue);
138+
finish(guard, mode::Errored);
125139
return;
126140
}
127141

128142
auto processor = coordinator.get_worker();
129-
143+
130144
RXCPP_UNWIND_AUTO([&](){guard.lock();});
131145
guard.unlock();
132146

@@ -143,16 +157,19 @@ struct observe_on
143157

144158
void on_next(source_value_type v) const {
145159
std::unique_lock<std::mutex> guard(state->lock);
160+
if (state->current == mode::Errored || state->current == mode::Disposed) { return; }
146161
state->fill_queue.push_back(notification_type::on_next(std::move(v)));
147162
state->ensure_processing(guard);
148163
}
149164
void on_error(std::exception_ptr e) const {
150165
std::unique_lock<std::mutex> guard(state->lock);
166+
if (state->current == mode::Errored || state->current == mode::Disposed) { return; }
151167
state->fill_queue.push_back(notification_type::on_error(e));
152168
state->ensure_processing(guard);
153169
}
154170
void on_completed() const {
155171
std::unique_lock<std::mutex> guard(state->lock);
172+
if (state->current == mode::Errored || state->current == mode::Disposed) { return; }
156173
state->fill_queue.push_back(notification_type::on_completed());
157174
state->ensure_processing(guard);
158175
}
@@ -163,7 +180,7 @@ struct observe_on
163180

164181
this_type o(d, std::move(coor), cs);
165182
auto keepAlive = o.state;
166-
cs.add([keepAlive](){
183+
cs.add([=](){
167184
std::unique_lock<std::mutex> guard(keepAlive->lock);
168185
keepAlive->ensure_processing(guard);
169186
});
@@ -262,6 +279,11 @@ class observe_on_one_worker : public coordination_base
262279
}
263280
};
264281

282+
inline observe_on_one_worker observe_on_run_loop(const rxsc::run_loop& rl) {
283+
static observe_on_one_worker r(rxsc::make_run_loop(rl));
284+
return r;
285+
}
286+
265287
inline observe_on_one_worker observe_on_event_loop() {
266288
static observe_on_one_worker r(rxsc::make_event_loop());
267289
return r;

Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,12 @@ struct subscribe_on : public operator_base<T>
5252
: public std::enable_shared_from_this<subscribe_on_state_type>
5353
, public subscribe_on_values
5454
{
55-
subscribe_on_state_type(const subscribe_on_values& i, coordinator_type coor, const output_type& oarg)
55+
subscribe_on_state_type(const subscribe_on_values& i, const output_type& oarg)
5656
: subscribe_on_values(i)
57-
, coordinator(std::move(coor))
5857
, out(oarg)
5958
{
6059
}
6160
composite_subscription source_lifetime;
62-
coordinator_type coordinator;
6361
output_type out;
6462
private:
6563
subscribe_on_state_type& operator=(subscribe_on_state_type o) RXCPP_DELETE;
@@ -72,33 +70,39 @@ struct subscribe_on : public operator_base<T>
7270
auto controller = coordinator.get_worker();
7371

7472
// take a copy of the values for each subscription
75-
auto state = std::make_shared<subscribe_on_state_type>(initial, std::move(coordinator), std::move(s));
73+
auto state = std::make_shared<subscribe_on_state_type>(initial, std::move(s));
74+
75+
auto sl = state->source_lifetime;
76+
auto ol = state->out.get_subscription();
7677

7778
auto disposer = [=](const rxsc::schedulable&){
78-
state->source_lifetime.unsubscribe();
79-
state->out.unsubscribe();
79+
sl.unsubscribe();
80+
ol.unsubscribe();
8081
coordinator_lifetime.unsubscribe();
8182
};
8283
auto selectedDisposer = on_exception(
83-
[&](){return state->coordinator.act(disposer);},
84+
[&](){return coordinator.act(disposer);},
8485
state->out);
8586
if (selectedDisposer.empty()) {
8687
return;
8788
}
88-
89-
state->out.add([=](){
90-
controller.schedule(selectedDisposer.get());
91-
});
89+
9290
state->source_lifetime.add([=](){
9391
controller.schedule(selectedDisposer.get());
9492
});
9593

94+
state->out.add([=](){
95+
sl.unsubscribe();
96+
ol.unsubscribe();
97+
coordinator_lifetime.unsubscribe();
98+
});
99+
96100
auto producer = [=](const rxsc::schedulable&){
97101
state->source.subscribe(state->source_lifetime, state->out);
98102
};
99103

100104
auto selectedProducer = on_exception(
101-
[&](){return state->coordinator.act(producer);},
105+
[&](){return coordinator.act(producer);},
102106
state->out);
103107
if (selectedProducer.empty()) {
104108
return;

Rx/v2/src/rxcpp/rx-scheduler.hpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ inline bool operator==(const worker& lhs, const worker& rhs) {
329329
inline bool operator!=(const worker& lhs, const worker& rhs) {
330330
return !(lhs == rhs);
331331
}
332-
332+
333333
class weak_worker
334334
{
335335
detail::worker_interface_weak_ptr inner;
@@ -344,7 +344,7 @@ class weak_worker
344344
, lifetime(owner.lifetime)
345345
{
346346
}
347-
347+
348348
worker lock() const {
349349
return worker(lifetime, inner.lock());
350350
}
@@ -419,6 +419,9 @@ inline scheduler make_scheduler(ArgN&&... an) {
419419
return scheduler(std::static_pointer_cast<scheduler_interface>(std::make_shared<Scheduler>(std::forward<ArgN>(an)...)));
420420
}
421421

422+
inline scheduler make_scheduler(std::shared_ptr<scheduler_interface> si) {
423+
return scheduler(si);
424+
}
422425

423426
class schedulable : public schedulable_base
424427
{
@@ -912,6 +915,7 @@ namespace rxsc=schedulers;
912915
}
913916

914917
#include "schedulers/rx-currentthread.hpp"
918+
#include "schedulers/rx-runloop.hpp"
915919
#include "schedulers/rx-newthread.hpp"
916920
#include "schedulers/rx-eventloop.hpp"
917921
#include "schedulers/rx-immediate.hpp"

Rx/v2/src/rxcpp/subjects/rx-subject.hpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ class multicast_observer
2626
enum type {
2727
Invalid = 0,
2828
Casting,
29+
Disposed,
2930
Completed,
3031
Errored
3132
};
@@ -102,6 +103,15 @@ class multicast_observer
102103
explicit multicast_observer(composite_subscription cs)
103104
: b(std::make_shared<binder_type>(cs))
104105
{
106+
auto keepAlive = b;
107+
b->state->lifetime.add([keepAlive](){
108+
if (keepAlive->state->current == mode::Casting){
109+
keepAlive->state->current = mode::Disposed;
110+
keepAlive->current_completer.reset();
111+
keepAlive->completer.reset();
112+
++keepAlive->state->generation;
113+
}
114+
});
105115
}
106116
trace_id get_id() const {
107117
return b->id;
@@ -144,6 +154,13 @@ class multicast_observer
144154
return;
145155
}
146156
break;
157+
case mode::Disposed:
158+
{
159+
guard.unlock();
160+
o.unsubscribe();
161+
return;
162+
}
163+
break;
147164
default:
148165
abort();
149166
}

0 commit comments

Comments
 (0)