Re: a *working* PostThreadMessage() implementation...?
Sorry, you have to review your implementation throughly. If you hope you can
call Start and Stop for it while other threads are posting messages, that
won't work. You have a few race conditions in your code. I'm leaving that
exercise up to you to find those holes.
".rhavin grobert" <clqrq@yahoo.de> wrote in message
news:23a856cc-adfe-443f-8a41-4ec734feb02c@2g2000hsn.googlegroups.com...
On 1 Okt., 19:58, "Scott McPhillips [MVP]" <org-dot-mvps-at-scottmcp>
wrote:
".rhavin grobert" <cl...@yahoo.de> wrote in message
news:af8a592c-31d5-439e-b547-5e6bb649d6d8@h60g2000hsg.googlegroups.com...
in order to not reinvent the weel, has anyone already coded a
*working* PostThreadMessage implementation?
eg. an impl that has a queue of a variable, self managing size and
that - of course - doesnt drop messages just because some other thread
is in some dialog modal loop?
Is there something about the built in implementation that does not meet
your
needs? You know, I suppose, that PostThreadMessage should only be used to
post to threads that do not display any GUI. With that limitation, the
queue is enormous and is not affected by other threads.
--
Scott McPhillips [VC++ MVP]
i want a message queue that doesnt care what the threads do currently
and who's got GUIs.
how about that aproach (not fully functional yet, QUAD = unsigned
__int64):
______________________________________________________
#define MSGBUFFER_INITIAL_SIZE 50 // queues initial slots
#define MSGBUFFER_DEFLATE_THRESHOLD 100 // if MaxSize - UsedSize is
smaller...
#define MSGBUFFER_DEFLATE_SIZE 25 // ...deflate buffer to
UsedSize + this
#define MSGBUFFER_INFLATE_SIZE 25 // increase by this if buffer
is to small
#pragma pack (push, 1)
struct SQTrdMsg {
QUAD qCommand;
QUAD qParam;
QUAD qAux;
void* pBody;
BYTE bFlags;
};
#pragma pack (pop)
class CQThread : public CWinThread
{
//* ... usual stuff here ... */
private:
// initialize message buffer and start functionality
bool _MsgBufferStart();
// free all memory used by message buffer, stop message buffering
bool _MsgBufferStop();
// add a new message to queue
bool _MsgBufferAdd(QUAD qMsg, QUAD qParam, QUAD qAux, PCVOID pBody,
BYTE bFlags = QTMF_DELETEBODY);
// add a new message to queue
bool _MsgBufferAdd(SQTrdMsg const* pMsg);
// unshift buffers oldest message from queue
bool _MsgBufferUnshift(SQTrdMsg* pTM);
// deflate queues buffer
bool _MsgBufferDeflate();
// inflate queues buffer
bool _MsgBufferInflate();
// copy contents of old queue into new buffer
void _MsgBufferRelocate(SQTrdMsg* pNewBuffer);
UINT m_nQueueLast; // last used slot in queue
UINT m_nQueueSize; // overall size of queue
UINT m_nQueueFirst; // first used slot in queue
UINT m_nQueueMax; // queues maximum allocation
SQTrdMsg* m_pQueueBuffer; // the allocated queue buffer
HANDLE m_hMessage;
CRITICAL_SECTION m_critMessage;
};
//-----------------------------------------------------------------------------
// initialize message buffer and start functionality
bool CQThread::_MsgBufferStart()
{
m_hMessage = ::CreateEvent(NULL, TRUE, FALSE, "QTreadMsgHnd");
m_nQueueLast = 0;
m_nQueueSize = 0;
m_nQueueFirst = 0;
m_pQueueBuffer = (SQTrdMsg*) malloc(sizeof(SQTrdMsg) *
MSGBUFFER_INITIAL_SIZE);
return (m_hMessage != 0);
}
//-----------------------------------------------------------------------------
// free all memory used by message buffer, stop message buffering
bool CQThread::_MsgBufferStop()
{
if (m_hMessage == 0)
return true;
EnterCriticalSection(&m_critMessage);
bool bRet = (::CloseHandle(m_hMessage) != FALSE);
m_hMessage = 0;
SQTrdMsg TM;
// remove all message-heads from queue, optionally deleting the body
while (_MsgBufferUnshift(&TM))
{
if ((TM.bFlags & QTMF_DELETEBODY) != 0)
delete TM.pBody;
}
// free queues still allocated memory
free(m_pQueueBuffer);
LeaveCriticalSection(&m_critMessage);
return bRet;
}
//-----------------------------------------------------------------------------
// add a new message to queue
bool CQThread::_MsgBufferAdd(QUAD qMsg, QUAD qParam, QUAD qAux, PCVOID
pBody,
BYTE bFlags)
{
SQTrdMsg TM;
TM.qParam = qParam;
TM.qCommand = qMsg;
TM.qAux = qAux;
TM.pBody = const_cast<void*>(pBody);
TM.bFlags = bFlags;
return _MsgBufferAdd(&TM);
}
//-----------------------------------------------------------------------------
// add a new message to queue
bool CQThread::_MsgBufferAdd(SQTrdMsg const* pMsg)
{
if (pMsg == NULL || m_hMessage == 0)
return false;
EnterCriticalSection(&m_critMessage);
// if buffer is not big enough, we have to resize it!
if (m_nQueueSize >= m_nQueueMax)
VERIFY(_MsgBufferInflate());
m_nQueueLast++;
m_nQueueSize++;
// start at beginning if next slot exceeds buffer
if (m_nQueueLast > m_nQueueMax)
m_nQueueLast = 0;
// copy message into current slot
*(m_pQueueBuffer + m_nQueueLast) = *pMsg;
LeaveCriticalSection(&m_critMessage);
//you've got message...
return (::SetEvent(m_hMessage) != FALSE);
}
//-----------------------------------------------------------------------------
// unshift buffers oldest message from queue
bool CQThread::_MsgBufferUnshift(SQTrdMsg* pTM)
{
if (pTM == NULL || m_nQueueSize == NULL)
return false;
EnterCriticalSection(&m_critMessage);
// copy message from first slot to given buffer
*(m_pQueueBuffer + m_nQueueFirst) = *pTM;
// start at beginning if first slot exceeds buffer
if (m_nQueueFirst > m_nQueueMax)
m_nQueueFirst = 0;
m_nQueueSize--;
// if size of buffer is too big, we shrink it
if (m_nQueueSize + MSGBUFFER_DEFLATE_THRESHOLD <= m_nQueueMax)
VERIFY(_MsgBufferDeflate());
LeaveCriticalSection(&m_critMessage);
return true;
}
//-----------------------------------------------------------------------------
// deflate queues buffer
bool CQThread::_MsgBufferDeflate()
{
// allocate new buffer
UINT nStackNew = (m_nQueueSize + MSGBUFFER_DEFLATE_SIZE);
UINT nBytesNew = sizeof(SQTrdMsg) * nStackNew;
SQTrdMsg* pQueueNew = (SQTrdMsg*) malloc(nBytesNew);
if (pQueueNew == NULL)
return false;
_MsgBufferRelocate(pQueueNew);
m_nQueueMax = nStackNew;
return true;
}
//-----------------------------------------------------------------------------
// inflate queues buffer
bool CQThread::_MsgBufferInflate()
{
// allocate new buffer
UINT nStackNew = (m_nQueueMax + MSGBUFFER_INFLATE_SIZE);
UINT nBytesNew = sizeof(SQTrdMsg) * nStackNew;
SQTrdMsg* pQueueNew = (SQTrdMsg*) malloc(nBytesNew);
if (pQueueNew == NULL)
return false;
_MsgBufferRelocate(pQueueNew);
m_nQueueMax = nStackNew;
return true;
}
//-----------------------------------------------------------------------------
// copy contents of old queue into new buffer
void CQThread::_MsgBufferRelocate(SQTrdMsg* pNewBuffer)
{
ASSERT(pNewBuffer != NULL);
if (m_nQueueSize == 0)
return;
// copy contents of old buffer to new location
if (m_nQueueFirst < m_nQueueLast)
{
// currently used queue doesnt span over buffers end: normal copy
memcpy(pNewBuffer, m_pQueueBuffer + m_nQueueFirst, m_nQueueSize);
}
else
{
// currently used does span over buffers end: swap at copy
UINT nCut = m_nQueueMax - m_nQueueFirst;
memcpy(pNewBuffer, m_pQueueBuffer + m_nQueueFirst, nCut);
memcpy(pNewBuffer + nCut, m_pQueueBuffer, m_nQueueLast);
}
m_nQueueFirst = 0;
m_nQueueLast = m_nQueueSize - 1;
free(m_pQueueBuffer);
m_pQueueBuffer = pNewBuffer;
}