Merge RP1A.191114.001
Change-Id: I82da7ecdac0e18f63342f1d6a6825edee1291f89
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 4f42007..7e3d567 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -174,22 +174,26 @@
std::mutex lock;
std::condition_variable wake;
bool disposed = false;
- rxu::error_ptr error;
auto dest = make_subscriber<T>(std::forward<ArgN>(an)...);
+ rxu::error_ptr error;
+ bool has_error = false;
+
// keep any error to rethrow at the end.
+ // copy 'dest' by-value to avoid using it after it goes out of scope.
auto scbr = make_subscriber<T>(
dest,
- [&](T t){dest.on_next(t);},
- [&](rxu::error_ptr e){
+ [dest](T t){dest.on_next(t);},
+ [dest,&error,&has_error,do_rethrow](rxu::error_ptr e){
if (do_rethrow) {
+ has_error = true;
error = e;
} else {
dest.on_error(e);
}
},
- [&](){dest.on_completed();}
+ [dest](){dest.on_completed();}
);
auto cs = scbr.get_subscription();
@@ -208,7 +212,7 @@
return disposed;
});
- if (error) {rxu::rethrow_exception(error);}
+ if (has_error) {rxu::rethrow_exception(error);}
}
public:
diff --git a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp b/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp
index 5145e92..7c93469 100644
--- a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp
+++ b/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp
@@ -37,6 +37,16 @@
virtual ~new_worker_state()
{
+ // Ensure that std::thread is no longer joinable,
+ // otherwise the destructor will call std::terminate.
+ if (!worker.joinable()) {
+ return;
+ }
+ if (worker.get_id() != std::this_thread::get_id()) {
+ worker.join();
+ } else {
+ worker.detach();
+ }
}
explicit new_worker_state(composite_subscription cs)
@@ -76,13 +86,7 @@
if (!keepAlive->q.empty()) std::terminate();
keepAlive->wake.notify_one();
- if (keepAlive->worker.joinable() && keepAlive->worker.get_id() != std::this_thread::get_id()) {
- guard.unlock();
- keepAlive->worker.join();
- }
- else {
- keepAlive->worker.detach();
- }
+ // ~new_worker_state cleans up the std::thread
});
state->worker = tf([keepAlive](){