Re: MT Design Question
Scott Meyers <NeverRead@aristeia.com> writes:
Suppose I have a graph that I want to search for a node that has some
characteristic (e.g., holds a value within some range). Suppose further that of
the many ways to search the graph (e.g., depth-first, breadth-first, random
walk, etc.), I can't predict which will be best, so I want to run them all
concurrently, stopping when either one of them finds a suitable node or they all
fail. That is, given something like this
Node* dfsSearch(Graph g, Predicate p); // do dfs search of g for
// a node satisfying p
Node* bfsSearch(Graph g, Predicate p); // do bfs search of g
Node* rwSearch(Graph g, Predicate p); // do random walk search of g
I want to do this:
concurrently invoke dfsSearch, bfsSearch, and rwSearch;
wait until one returns success or they all return lack of success;
if one returned success, tell the others to stop searching;
Spawn a thread for each search using std::async to catch the
result/exception in a future, passing in a "done" flag:
std::atomic<bool> done(false);
std::vector<std::future<Node*>> results;
results.push_back(std::async(std::launch::async,
do_dfs_search,graph,predicate,&done));
results.push_back(std::async(std::launch::async,
do_bfs_search,graph,predicate,&done));
results.push_back(std::async(std::launch::async,
do_rw_search,graph,predicate,&done));
where the do_xxx_search functions poll the done flag periodically, and
exit with "not found" if done is already set. They set "done" if they
exit successfully, but not if they exit with an exception.
In the main thread, then wait on each future in turn. You need to ensure
that all threads exit before you continue anyway, since leaving threads
running in the background is a bad idea as you don't know what they'll
access. You thus don't lose anything by waiting in whatever order if
you're going to just return the value:
std::exception_ptr e;
for(unsigned i=0;i<results.size();++i)
{
try{
if(Node* res=results[i].get()) return res;
}
catch(...)
{
e=std::current_exception();
}
}
// no answer found, so throw an exception, if any
if(e!=std::exception_ptr())
std::rethrow_exception(e);
// else return "not found"
return nullptr;
You can change the logic around the exceptions fairly easily, e.g. to
always propagate the first found, to only propagate if all threads
throw, to propagate a composite exception containing all the thrown
exceptions, or whatever.
If you're going to do further processing right there and then in the
waiting function, you don't want to have to wait for the other threads
to finish. Therefore you need another mechanism.
You could just use a mutex and condition variable which is passed in to
every thread function. The cond var is notified when each thread exits
(with notify_all_at_thread_exit) and the waiting thread then blocks on
the cond var rather than on any of the futures. When it wakes from the
cond-var wait it can poll the futures to see which (if any) thread is
done, and process the result as appropriate.
std::mutex m;
std::condition_variable c;
// pass &m and &c into threads.
std::unique_lock<std::mutex> lk(m);
for(;;)
{
for(unsigned i=0;i<results.size();++i)
{
if(!results[i].valid() ||
(result[i].wait_for(std::chrono::seconds(0))!=
std::future_status::ready))
break;
try{
if(Node* res=results[i].get()) return res;
}
catch(...)
{
e=std::current_exception();
}
}
}
The thread function then does:
std::unique_lock<std::mutex> lk(m);
std::notify_all_at_thread_exit(c,std::move(lk));
when it's done. This is essentially what boost::wait_for_any does under
the covers.
Alternatively, you could use a single promise/future pair to store the
result and have the main thread wait for that. In this case you need to
ensure that it is set if all threads throw, otherwise the main thread
will wait forever. This can be achieved by using an atomic count of
searching threads which is decremented when each thread exits (whether
normally or with an exception). The last thread out must set the promise
however it exits.
std::promise<Node*> p;
std::atomic<int> count(3);
std::future<Node*> result=p.get_future();
std::vector<std::future<void>> async_handles;
async_handles.push_back(std::async(std::launch::async,
do_dfs_search,graph,predicate,&done,&p,&count));
async_handles.push_back(std::async(std::launch::async,
do_bfs_search,graph,predicate,&done,&p,&count));
async_handles.push_back(std::async(std::launch::async,
do_rw_search,graph,predicate,&done,&p,&count));
do_something_with(result.get());
Each thread then does something like:
try{
if(Node* res=real_search())
{
try{
p.set_value(res);
}
catch(std::future_error) // in case promise already set
{}
}
if(!--count)
{
try{
p.set_value(nullptr);
}
catch(std::future_error) // in case promise already set
{}
}
}
catch(...)
{
if(!--count)
{
try{
p.set_exception(std::current_exception());
}
catch(std::future_error) // in case promise already set
{}
}
}
Anthony
--
Author of C++ Concurrency in Action http://www.stdthread.co.uk/book/
just::thread C++0x thread library http://www.stdthread.co.uk
Just Software Solutions Ltd http://www.justsoftwaresolutions.co.uk
15 Carrallack Mews, St Just, Cornwall, TR19 7UL, UK. Company No. 5478976