Re: Request for comments about synchronized queue using boost
On Oct 16, 3:44 pm, Nordl=F6w <per.nord...@gmail.com> wrote:
On 15 Okt, 18:02, Maxim Yegorushkin <maxim.yegorush...@gmail.com>
wrote:
On Oct 15, 2:36 pm, Nordl=F6w <per.nord...@gmail.com> wrote:
I am currently designing a synchronized queue used to communicate
between threads. Is the code given below a good solution? Am I
using mutex lock/unlock more than needed?
/Nordl=F6w
The file synched_queue.hpp follows:
#ifndef PNW__SYNCHED_QUEUE_HPP
#define PNW__SYNCHED_QUEUE_HPP
/*!
* @file synched_queue.hpp
* @brief Synchronized (Thread Safe) Container Wrapper on std:queue
* using Boost::Thread.
*/
#include <queue>
#include <iostream>
#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
//
========================
==========================
==========================
===
template <typename T>
class synched_queue
{
std::queue<T> q; ///< Queue.
boost::mutex m; ///< Mutex.
A member variable is missing here:
boost::condition c;
public:
/*!
* Push @p value.
*/
void push(const T & value) {
boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
q.push(value);
You need to notify other threads waiting on the queue:
c.notify_one();
}
/*!
* Try and pop into @p value, returning directly in any cas=
e.
* @return true if pop was success, false otherwise.
*/
bool try_pop(T & value) {
boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
if (q.size()) {
value = q.front();
q.pop();
return true;
}
return false;
}
/// Pop and return value, possibly waiting forever.
T wait_pop() {
boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
// wait until queue has at least on element()
The following line:
c.wait(sl, boost::bind(&std::queue<T>::size, q));
boost::bind(&std::queue<T>::size, q) stores a copy of the queue in the
object created by boost::bind, so that the wait never finishes if the
queue is empty (and if the condition variable is not notified (see
above)).
It should be as simple as:
while(q.empty())
c.wait(sl);
T value = q.front();
q.pop();
return value;
}
size_type size() const {
boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
return q.size();
}
bool empty() const {
boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
return q.empty();
}
};
//
========================
==========================
==========================
===
#endif
The other thing is that the queue does not support destruction: the
destructor does not unblock any threads blocked in wait.
Apart from that, the mutex is held for too long. You don't really need
to hold the lock when allocating memory for elements and when invoking
the copy constructor of the elements.
Here is an improved version (although a bit simplified):
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/function.hpp>
#include <list>
template<class T>
class atomic_queue : private boost::noncopyable
{
private:
boost::mutex mtx_;
boost::condition cnd_;
bool cancel_;
unsigned waiting_;
// use list as a queue because it allows for splicing:
// moving elements between lists without any memory allocation =
and
copying
typedef std::list<T> queue_type;
queue_type q_;
public:
struct cancelled : std::logic_error
{
cancelled() : std::logic_error("cancelled") {}
};
atomic_queue()
: cancel_()
, waiting_()
{}
~atomic_queue()
{
// cancel all waiting threads
this->cancel();
}
void cancel()
{
// cancel all waiting threads
boost::mutex::scoped_lock l(mtx_);
cancel_ = true;
cnd_.notify_all();
// and wait till they are done
while(waiting_)
cnd_.wait(l);
}
void push(T const& t)
{
// this avoids an allocation inside the critical sectio=
n
bellow
queue_type tmp(&t, &t + 1);
{
boost::mutex::scoped_lock l(mtx_);
q_.splice(q_.end(), tmp);
}
cnd_.notify_one();
}
// this function provides only basic exception safety if T's co=
py
ctor can
// throw or strong exception safety if T's copy ctor is nothrow
T pop()
{
// this avoids copying T inside the critical section be=
llow
queue_type tmp;
{
boost::mutex::scoped_lock l(mtx_);
++waiting_;
while(!cancel_ && q_.empty())
cnd_.wait(l);
--waiting_;
if(cancel_)
{
cnd_.notify_all();
throw cancelled();
}
tmp.splice(tmp.end(), q_, q_.begin());
}
return tmp.front();
}
};
typedef boost::function<void()> unit_of_work;
typedef atomic_queue<unit_of_work> work_queue;
void typical_thread_pool_working_thread(work_queue* q)
try
{
for(;;)
q->pop()();}
catch(work_queue::cancelled&)
{
// time to terminate the thread
}
Are there any resources out there on the Internet on how to design
*thread-safe* *efficient* data-structures?
I would recommend "Programming with POSIX Threads" book by by David R.
Butenhof.
Doesn't the push-argument "T const & t" instead of my version "const T
& t" mean that we don't copy at all here?
No, T const& and const T& is the same thing: a reference to a constant
T.
I believe &t evaluates to
the memory pointer of t:
void push(T const& t)
{
// this avoids an allocation inside the critical section
bellow
queue_type tmp(&t, &t + 1);
{
boost::mutex::scoped_lock l(mtx_);
q_.splice(q_.end(), tmp);
}
cnd_.notify_one();
}
The trick here is that element t is first inserted in a temporary list
tmp on the stack.
queue_type tmp(&t, &t + 1); // create a list with a copy of t
This involves allocating memory and copying t. And here it is done
without holding the lock because allocating memory may be expensive
(might cause the system to do swapping) and as you hold the lock all
the worker threads won't be able to pop elements from the queue during
such time. Next, the lock is acquired and the element is moved from
list tmp into q_:
q_.splice(q_.end(), tmp);
This operation does not involve any memory allocation or copying
elements (because you can do so easily with the nodes of doubly-linked
lists), which make your critical section of code execute really fast
without stalling the worked threads for too long.
--
Max