Re: Synchronization Algorithm Verificator for C++0x
On 6 =C1=D7=C7, 02:54, "Chris M. Thomasson" <n...@spam.invalid> wrote:
I would be interested to see how long it takes to detect the following bu=
g
you found in an older version of AppCore:
http://groups.google.com/group/comp.programming.threads/browse_frm/th...
Ok, let's see. I've just finished basic mutex and condvar
implementation.
Here is eventcount:
class eventcount
{
public:
eventcount()
: count(0)
, waiters(0)
{}
void signal_relaxed()
{
unsigned cmp = count($).load(std::memory_order_relaxed);
signal_impl(cmp);
}
void signal()
{
unsigned cmp = count($).fetch_add(0, std::memory_order_seq_cst);
signal_impl(cmp);
}
unsigned get()
{
unsigned cmp = count($).fetch_or(0x80000000,
std::memory_order_seq_cst);
return cmp & 0x7FFFFFFF;
}
void wait(unsigned cmp)
{
unsigned ec = count($).load(std::memory_order_seq_cst);
if (cmp == (ec & 0x7FFFFFFF))
{
guard.lock($);
ec = count($).load(std::memory_order_seq_cst);
if (cmp == (ec & 0x7FFFFFFF))
{
waiters($) += 1;
cv.wait(guard, $);
}
guard.unlock($);
}
}
private:
std::atomic<unsigned> count;
rl::var<unsigned> waiters;
mutex guard;
condition_variable_any cv;
void signal_impl(unsigned cmp)
{
if (cmp & 0x80000000)
{
guard.lock($);
while (false == count($).compare_swap(cmp,
(cmp + 1) & 0x7FFFFFFF, std::memory_order_relaxed));
unsigned w = waiters($);
waiters($) = 0;
guard.unlock($);
if (w)
cv.notify_all($);
}
}
};
Here is the queue:
template<typename T>
class spsc_queue
{
public:
spsc_queue()
{
node* n = RL_NEW node ();
head($) = n;
tail($) = n;
}
~spsc_queue()
{
RL_ASSERT(head($) == tail($));
RL_DELETE((node*)head($));
}
void enqueue(T data)
{
node* n = RL_NEW node (data);
head($)->next($).store(n, std::memory_order_release);
head($) = n;
ec.signal_relaxed();
}
T dequeue()
{
T data = try_dequeue();
while (0 == data)
{
int cmp = ec.get();
data = try_dequeue();
if (data)
break;
ec.wait(cmp);
data = try_dequeue();
if (data)
break;
}
return data;
}
private:
struct node
{
std::atomic<node*> next;
rl::var<T> data;
node(T data = T())
: next(0)
, data(data)
{}
};
rl::var<node*> head;
rl::var<node*> tail;
eventcount ec;
T try_dequeue()
{
node* t = tail($);
node* n = t->next($).load(std::memory_order_acquire);
if (0 == n)
return 0;
T data = n->data($);
RL_DELETE(t);
tail($) = n;
return data;
}
};
Here is the test:
struct spsc_queue_test : rl::test_suite<spsc_queue_test, 2>
{
spsc_queue<int> q;
void thread(unsigned thread_index)
{
if (0 == thread_index)
{
q.enqueue(11);
}
else
{
int d = q.dequeue();
RL_ASSERT(11 == d);
}
}
};
int main()
{
rl::simulate<spsc_queue_test>();
}
It takes around 40 minutes.
Let's run it!
Here is output (note error detected on iteration 3):
struct spsc_queue_test
DEADLOCK
iteration: 3
execution history:
[0] 1: [BEFORE BEGIN], in rl::context_impl<struct
spsc_queue_test,class rl::random_scheduler<2> >::fiber_proc_impl,
context.hpp(341)
[1] 1: <0035398C> atomic store, value=-1, (prev value=0),
order=relaxed, in rl::generic_mutex<0>::generic_mutex, stdlib/
mutex.hpp(77)
[2] 1: memory allocation: addr=00353A80, size=52, in
spsc_queue<int>::spsc_queue, peterson.cpp(397)
[3] 1: <00353938> store, value=00353A80, in
spsc_queue<int>::spsc_queue, peterson.cpp(398)
[4] 1: <00353948> store, value=00353A80, in
spsc_queue<int>::spsc_queue, peterson.cpp(399)
[5] 1: [BEFORE END], in rl::context_impl<struct spsc_queue_test,class
rl::random_scheduler<2> >::fiber_proc_impl, context.hpp(356)
[6] 1: <00353948> load, value=00353A80, in
spsc_queue<int>::try_dequeue, peterson.cpp(452)
[7] 1: <00353A80> atomic load, value=00000000, order=acquire, in
spsc_queue<int>::try_dequeue, peterson.cpp(453)
[8] 1: <00353958> fetch_or , prev=0, op=2147483648, new=2147483648,
order=seq_cst, in eventcount::get, peterson.cpp(349)
[9] 1: <00353948> load, value=00353A80, in
spsc_queue<int>::try_dequeue, peterson.cpp(452)
[10] 0: memory allocation: addr=0035BCA0, size=52, in
spsc_queue<int>::enqueue, peterson.cpp(410)
[11] 0: <00353938> load, value=00353A80, in spsc_queue<int>::enqueue,
peterson.cpp(411)
[12] 1: <00353A80> atomic load, value=00000000, order=acquire, in
spsc_queue<int>::try_dequeue, peterson.cpp(453)
[13] 0: <00353A80> atomic store, value=0035BCA0, (prev
value=00000000), order=release, in spsc_queue<int>::enqueue,
peterson.cpp(411)
[14] 0: <00353938> store, value=0035BCA0, in spsc_queue<int>::enqueue,
peterson.cpp(412)
[15] 0: <00353958> atomic load, value=0 [NOT CURRENT], order=relaxed,
in eventcount::signal_relaxed, peterson.cpp(337)
[16] 1: <00353958> atomic load, value=2147483648, order=seq_cst, in
eventcount::wait, peterson.cpp(355)
[17] 1: <0035398C> CAS succ orig=-1, cmp=-1, xchg=1, order=acquire,=
in
rl::generic_mutex<0>::lock, stdlib/mutex.hpp(117)
[18] 1: <0035398C> mutex: lock, in eventcount::wait, peterson.cpp(358)
[19] 1: <00353958> atomic load, value=2147483648, order=seq_cst, in
eventcount::wait, peterson.cpp(359)
[20] 1: <0035397C> load, value=0, in eventcount::wait,
peterson.cpp(362)
[21] 1: <0035397C> store, value=1, in eventcount::wait,
peterson.cpp(362)
[22] 1: <0035398C> atomic store, value=-1, (prev value=1),
order=release, in rl::generic_mutex<0>::unlock, stdlib/mutex.hpp(209)
[23] 1: <0035398C> mutex: unlock, in eventcount::wait,
peterson.cpp(363)
[24] 1: blocking thread, in eventcount::wait, peterson.cpp(363)
[25] 1: deadlock detected, in eventcount::wait, peterson.cpp(363)
thread 0:
[10] 0: memory allocation: addr=0035BCA0, size=52, in
spsc_queue<int>::enqueue, peterson.cpp(410)
[11] 0: <00353938> load, value=00353A80, in spsc_queue<int>::enqueue,
peterson.cpp(411)
[13] 0: <00353A80> atomic store, value=0035BCA0, (prev
value=00000000), order=release, in spsc_queue<int>::enqueue,
peterson.cpp(411)
[14] 0: <00353938> store, value=0035BCA0, in spsc_queue<int>::enqueue,
peterson.cpp(412)
[15] 0: <00353958> atomic load, value=0 [NOT CURRENT], order=relaxed,
in eventcount::signal_relaxed, peterson.cpp(337)
thread 1:
[0] 1: [BEFORE BEGIN], in rl::context_impl<struct
spsc_queue_test,class rl::random_scheduler<2> >::fiber_proc_impl,
context.hpp(341)
[1] 1: <0035398C> atomic store, value=-1, (prev value=0),
order=relaxed, in rl::generic_mutex<0>::generic_mutex, stdlib/
mutex.hpp(77)
[2] 1: memory allocation: addr=00353A80, size=52, in
spsc_queue<int>::spsc_queue, peterson.cpp(397)
[3] 1: <00353938> store, value=00353A80, in
spsc_queue<int>::spsc_queue, peterson.cpp(398)
[4] 1: <00353948> store, value=00353A80, in
spsc_queue<int>::spsc_queue, peterson.cpp(399)
[5] 1: [BEFORE END], in rl::context_impl<struct spsc_queue_test,class
rl::random_scheduler<2> >::fiber_proc_impl, context.hpp(356)
[6] 1: <00353948> load, value=00353A80, in
spsc_queue<int>::try_dequeue, peterson.cpp(452)
[7] 1: <00353A80> atomic load, value=00000000, order=acquire, in
spsc_queue<int>::try_dequeue, peterson.cpp(453)
[8] 1: <00353958> fetch_or , prev=0, op=2147483648, new=2147483648,
order=seq_cst, in eventcount::get, peterson.cpp(349)
[9] 1: <00353948> load, value=00353A80, in
spsc_queue<int>::try_dequeue, peterson.cpp(452)
[12] 1: <00353A80> atomic load, value=00000000, order=acquire, in
spsc_queue<int>::try_dequeue, peterson.cpp(453)
[16] 1: <00353958> atomic load, value=2147483648, order=seq_cst, in
eventcount::wait, peterson.cpp(355)
[17] 1: <0035398C> CAS succ orig=-1, cmp=-1, xchg=1, order=acquire,=
in
rl::generic_mutex<0>::lock, stdlib/mutex.hpp(117)
[18] 1: <0035398C> mutex: lock, in eventcount::wait, peterson.cpp(358)
[19] 1: <00353958> atomic load, value=2147483648, order=seq_cst, in
eventcount::wait, peterson.cpp(359)
[20] 1: <0035397C> load, value=0, in eventcount::wait,
peterson.cpp(362)
[21] 1: <0035397C> store, value=1, in eventcount::wait,
peterson.cpp(362)
[22] 1: <0035398C> atomic store, value=-1, (prev value=1),
order=release, in rl::generic_mutex<0>::unlock, stdlib/mutex.hpp(209)
[23] 1: <0035398C> mutex: unlock, in eventcount::wait,
peterson.cpp(363)
[24] 1: blocking thread, in eventcount::wait, peterson.cpp(363)
[25] 1: deadlock detected, in eventcount::wait, peterson.cpp(363)
Operations on condition variables are not yet in execution history.
Let's replace 'ec.signal_relaxed()' with 'ec.signal()'. Here is the
output:
struct spsc_queue_test
6% (65536/1000000)
13% (131072/1000000)
19% (196608/1000000)
26% (262144/1000000)
32% (327680/1000000)
39% (393216/1000000)
45% (458752/1000000)
52% (524288/1000000)
58% (589824/1000000)
65% (655360/1000000)
72% (720896/1000000)
78% (786432/1000000)
85% (851968/1000000)
91% (917504/1000000)
98% (983040/1000000)
iterations: 1000000
total time: 2422
throughput: 412881
Test passed. Throughput is around 400'000 test executions per second.
Also test reveals some errors with memory fences.
In signaling function you are using acquire fence in cas, but relaxed
is enough here:
void signal_impl(unsigned cmp)
{
if (cmp & 0x80000000)
{
guard.lock($);
while (false == count($).compare_swap(cmp,
(cmp + 1) & 0x7FFFFFFF, std::memory_order_relaxed));
unsigned w = waiters($);
waiters($) = 0;
guard.unlock($);
if (w)
cv.notify_all($);
}
}
Also. Following naive approach doesn't work (at least in C++0x):
void signal()
{
std::atomic_thread_fence(std::memory_order_seq_cst);
signal_relaxed();
}
You have to execute sequentially consistent atomic RMW on 'ec'
variable here. Because sequentially consistent fence and sequentially
consistent atomic RMW operation doesn' t synchronize-with each other.
Hmmm... This is strange. But I can't find anything about this in
draft... Anthony Williams said that committee made some changes to
initial proposal on fences, but they are not yet published...
Dmitriy V'jukov