Re: MT Design Question
"Scott Meyers" <NeverRead@aristeia.com> wrote in message
news:i512pm$sut$1@news.albasani.net...
This is a threading-related design question assuming that C++0x is the
implementation language.
Suppose I have a graph that I want to search for a node that has some
characteristic (e.g., holds a value within some range). Suppose further
that of
the many ways to search the graph (e.g., depth-first, breadth-first,
random
walk, etc.), I can't predict which will be best, so I want to run them all
concurrently, stopping when either one of them finds a suitable node or
they all
fail. That is, given something like this
Node* dfsSearch(Graph g, Predicate p); // do dfs search of g for
// a node satisfying p
Node* bfsSearch(Graph g, Predicate p); // do bfs search of g
Node* rwSearch(Graph g, Predicate p); // do random walk search of g
I want to do this:
concurrently invoke dfsSearch, bfsSearch, and rwSearch;
wait until one returns success or they all return lack of success;
if one returned success, tell the others to stop searching;
Scott, I am sorry for the very crude and brief description of the algorithm.
I have no time to elaborate any further right now. So, I hope that you can
gain something from this!
:^)
I will give more info when I get some more free time.
Thanks.
------------------------------------------
I did something very similar when I needed to search a large, completely
unordered array. I created three tasks, one searched from left-to-right, one
searched from right-to-left, and the other from the center to both ends of
the array. I used an eventcount for the waiting. Luckily, an eventcount can
be fairly easily constructed in C++0x. IIRC, here is a high-level brief
sketch of how the data-structures were setup:
<high-level pseudo-code sketch>
___________________________________________________________
// an entry in the array to search for
struct entry
{
bool compare(unsigned) const;
};
// the entry array
struct array
{
entry m_data[DEPTH]; // large array of entry's
};
// search completion
#define FAILURE 0xFFFFFFFFU // special failure code
struct complete
{
eventcount m_ecount;
atomic<entry*> m_complete; // = NULL
atomic<unsigned> m_refs;
complete(unsigned refs) : m_refs(refs)
{
}
void signal(entry* e)
{
entry* cmp = NULL;
if (m_complete.compare_exchange_strong(cmp, e, mb_relaxed))
m_ecount.signal();
if (m_refs.fetch_sub(-1, mb_release) == 1)
{
atomic_thread_fence(mb_acquire);
delete this;
}
}
entry* wait()
{
entry* e;
while (! (e = m_complete.load(mb_relaxed)))
{
eventcount::key const k = m_ecount.get();
if ((e = m_complete.load(mb_relaxed))) break;
m_ecount.wait(k);
}
if (m_refs.fetch_sub(-1, mb_release) == 1)
{
atomic_thread_fence(mb_acquire);
delete this;
}
return (e != FAILURE) ? e : NULL;
}
};
// a search task
typedef void (*fp_search_type) (stask&);
struct stask
{
array* m_array; // the target array
atomic<bool> m_cancel; // cancel flag == false
complete& m_complete; // completion
unsigned m_key; // search key
fp_search_type m_fp_search; // search funcptr
stask(array* a, unsigned key, fp_search_type s, complete& c)
: m_array(a), m_complete(c), m_key(key), m_fp_search(s)
{
}
// this is called by the main thread-pool
void work()
{
m_fp_search(*this);
}
};
// a left-to-right search
static void search_ltr(stask& s)
{
for (unsigned i = 0; i < DEPTH && ! m_cancel.load(mb_relaxed); ++i)
{
if (s.m_array[i].m_data.compare(s.m_key))
{
s.m_complete.signal(&s.m_array[i].m_data);
return;
}
}
s.m_complete.complete(FAILURE);
}
// a right-to-left search
static void search_rtl(stask& s)
{
[...]; // completion logic is identical to left-to-right search
}
// a center-to-both-ends search
static void search_cbe(stask& s)
{
[...]; // completion logic is identical to left-to-right search
}
// the main search function... finally! ;^)
static entity* search(array& a, unsigned key)
{
// create a completion structure with a refcount of 4
complete* c = new complete(4);
// create search tasks
stask* stask_ltr = new stask(&a, key, search_ltr, *c);
stask* stask_rtl = new stask(&a, key, search_rtl, *c);
stask* stask_cbe = new stask(&a, key, search_cbe, *c);
// enqueue search tasks
g_thread_pool.push(stask_ltr);
g_thread_pool.push(stask_rtl);
g_thread_pool.push(stask_cbe);
// wait for a result...
entry* e = c.wait();
// cancel everything
stask_ltr.m_cancel = true;
stask_rtl.m_cancel = true;
stask_cbe.m_cancel = true;
// release references
stask_ltr.release();
stask_rtl.release();
stask_cbe.release();
// return result
return e;
};
___________________________________________________________
That's about all of the relevant parts of the scheme I was using. Basically,
the main search function would create a completion structure (e.g., `struct
complete' in the example) and three search tasks `struct stask'. Then it
enqueued the tasks in the main thread pool and finally waited on the
completion structure.
The completion and failure logic was very simple. First we can observe that
if one search fails, all of them failed. Also, if one succeeds then all of
them succeeded. So, if a thread happens to find the entry, it does a
compare-and-swap on the "result variable" (examine the `struct
complete::signal() procedure'). Also, if a thread fails to find the entry is
does a CAS with a special failure code (e.g., the FAILED macro). If the CAS
succeeds, then the entire search operation is complete and the result can be
reported back to the main search function (examine the `struct
complete::wait() function').
Finally, the main search operation obtains the result, releases its
references to the allocated tasks, and returns. Please note that it does not
need to wait or join with any of the tasks. As soon as the result is ready,
the main search function can finish. You can also do this without a thread
pool and use detached threads since there is no need to join. The completion
structure and tasks were all reference counted which worked fairly nicely in
that scheme.
I hope that makes some sense!
:^o
[...]