Re: a *working* PostThreadMessage() implementation...?
On 2 Okt., 18:18, "Doug Harrison [MVP]" <d...@mvps.org> wrote:
On Thu, 2 Oct 2008 05:26:38 -0700 (PDT), ".rhavin grobert" <cl...@yahoo.de>
wrote:
On 2 Okt., 00:02, "Doug Harrison [MVP]" <d...@mvps.org> wrote:
I meant it would be better than trying to roll one's own message queue to
work around the PostThreadMessage issues. In order for PTM to work right,
all message loops have to account for it, which could be a problem down the
road if you add a secondary message loop for some reason.
?
there are only two acceptable scenarios:
1: Message IS delivered to the treads message queue and WILL appear in
the threads own message loop
2: Message cant be delivered and the posting-Fn return an error.
Then you should be happy with PostThreadMessage, because it satisfies those
two criteria.
in my experience it doesnt ;-)
Seriously. The problem occurs when one or more message loops
the thread runs fail to handle messages posted by PostThreadMessage.
i've experienced enough cases where the message doesnt even appear.
if you use PostMessage to a (hidden) window, any message loop that uses
DispatchMessage will handle things fine. In order to run into a problem
with PostMessage, the message loop would have to eat a message it knows
nothing about, which would be a rare bug.
my current implementation looks like this and im currently testing it
(seems to run):
______________________________________________________
// header
#pragma pack (push, 1)
struct SQTrdMsg {
QUAD qCommand;
QUAD qParam;
QUAD qAux;
void* pBody;
BYTE bFlags;
};
#pragma pack (pop)
//-----------------------------------------------------------------------------
// threads external handling function
typedef void (*QW_TRD_EXT_PROC) (void* pObj, UINT nCase, void*
ptParam);
//-----------------------------------------------------------------------------
// cases to handle
#define QTMEX_MESSAGE 0x000 // message was received, ptParam =
SQTrdMsg*
#define QTMEX_STARTING 0x001 // thread is started, ptParam =
CQThreadX*
#define QTMEX_STARTED 0x002 // thread was started, ptParam =
CQThreadX*
#define QTMEX_TERMINATING 0x003 // thread will terminate, ptParam =
CQThreadX*
#define QTMEX_TERMINATED 0x004 // thread terminated, ptParam =
CQThreadX*
//-----------------------------------------------------------------------------
// thread queue 'magic numbers'
#define MSGBUFFER_INITIAL_SIZE 100 // queues initial slots
SQTrdMsg-blocks
#define MSGBUFFER_DEFLATE_THRESHOLD 50 // if MaxSize - UsedSize is
smaller ...
#define MSGBUFFER_DEFLATE_SIZE 25 // ...deflate buffer to
UsedSize + this
#define MSGBUFFER_INFLATE_SIZE 25 // increase by this if buffer
too small
//-----------------------------------------------------------------------------
// thread message processing flags
#define QTMF_DELETEBODY 0x01
//-----------------------------------------------------------------------------
#define QTM_QUIT 0 // default quit message;
//-----------------------------------------------------------------------------
class CQThreadX : public CWinThread
{
// ctor & dtor see protected interface
//
------------------------------------------------------------------------
// public object interface
public:
// get exit code (qParam of QUIT-Message)
inline QUAD ExitCodeGet() const {return m_qExitCode;};
// get currently agreed exit command
inline QUAD ExitComandGet() const {return m_qExitCommand;};
// set command to exit the threads message loop
inline void ExitComandSet(QUAD qCmd) {m_qExitCommand = qCmd;};
// set message handling function called for each received message
inline void HandlerSet(QW_TRD_EXT_PROC pFn, void* pThis);
// add a new message to queue (windows replacement)
BOOL PostThreadMessage(UINT nMessage, WPARAM wParam, LPARAM lParam);
// add a new message to queue
bool PostThreadMessage(QUAD qMessage, QUAD qParam = 0, QUAD qAux = 0,
PCVOID pBody = NULL, BYTE bFlags = QTMF_DELETEBODY);
// add a new message to queue
bool PostThreadMessage(SQTrdMsg const* pMsg);
// ClassWizard generated virtual function overrides
//{{AFX_VIRTUAL(CQThreadX)
public:
//}}AFX_VIRTUAL
//
------------------------------------------------------------------------
// public class interface
// start a new thread
static CQThreadX* ThreadBegin();
//
------------------------------------------------------------------------
// protected interface
protected:
// msg received (called before dispatch, return false to stop
dispatching)
virtual bool OnMessage(SQTrdMsg& TM) {return true;};
// protected constructor used by dynamic creation
CQThreadX();
// protected destructor
virtual ~CQThreadX();
// Generated message map functions
//{{AFX_MSG(CQThreadX)
// NOTE - the ClassWizard will add and remove member functions here.
//}}AFX_MSG
DECLARE_MESSAGE_MAP()
//
------------------------------------------------------------------------
// private functions
private:
// deflate queues buffer
bool _MsgBufferDeflate();
// inflate queues buffer
bool _MsgBufferInflate();
// copy contents of old queue into new buffer
void _MsgBufferRelocate(SQTrdMsg* pNewBuffer);
// initialize message buffer and start functionality
bool _MsgBufferStart();
// free all memory used by message buffer, stop message buffering
bool _MsgBufferStop();
// unshift buffers oldest message from queue
bool _MsgBufferUnshift(SQTrdMsg* pTM);
// threads main message loop
QUAD _MsgLoop();
// threads AFX_THREADPROC
static UINT WINAPI _ThreadProc(LPVOID pParam);
DECLARE_DYNCREATE(CQThreadX)
//
------------------------------------------------------------------------
// members
UINT m_nQueueLast; // last used slot in queue
UINT m_nQueueSize; // overall used size of queue
UINT m_nQueueFirst; // first used slot in queue
UINT m_nQueueMax; // queues maximum index
SQTrdMsg* m_pQueueBuffer; // the allocated queue buffer
HANDLE m_hMessage; // event-handle: queue has
messages
QUAD m_qExitCode; // threads 64bit exit code
QUAD m_qExitCommand; // command to exit the thread
CRITICAL_SECTION m_critMessage; // buffer access
syncronissation
QW_TRD_EXT_PROC m_pMsgProc; // pointer to a message
handling fn
void* m_pMsgThis; // msg handling-fn's object-
pointer
};
___________________________________________________________
// implementation
IMPLEMENT_DYNCREATE(CQThreadX, CWinThread)
/
******************************************************************************
* CQThreadX public
interface *
******************************************************************************/
//-----------------------------------------------------------------------------
// add a new message to queue (windows replacement)
BOOL CQThreadX::PostThreadMessage(UINT nMessage, WPARAM wParam, LPARAM
lParam)
{
SQTrdMsg TM;
if (nMessage == WM_QUIT)
TM.qCommand = ExitComandGet();
else
TM.qCommand = nMessage;
TM.qParam = wParam;
TM.qAux = lParam;
TM.pBody = NULL;
TM.bFlags = 0;
if (PostThreadMessage(&TM))
return TRUE;
return FALSE;
}
//-----------------------------------------------------------------------------
// add a new message to queue
bool CQThreadX::PostThreadMessage(QUAD qMsg, QUAD qParam, QUAD qAux,
PCVOID pBody, BYTE bFlags)
{
SQTrdMsg TM;
TM.qParam = qParam;
TM.qCommand = qMsg;
TM.qAux = qAux;
TM.pBody = const_cast<void*>(pBody);
TM.bFlags = bFlags;
return PostThreadMessage(&TM);
}
//-----------------------------------------------------------------------------
// add a new message to queue
bool CQThreadX::PostThreadMessage(SQTrdMsg const* pMsg)
{
if (pMsg == NULL || m_hMessage == 0)
return false;
EnterCriticalSection(&m_critMessage);
// if buffer is not big enough, we have to resize it!
if (m_nQueueSize > m_nQueueMax)
VERIFY(_MsgBufferInflate());
m_nQueueLast++;
m_nQueueSize++;
// start at beginning if next slot exceeds buffer
if (m_nQueueLast > m_nQueueMax)
m_nQueueLast = 0;
// copy message into current slot
*(m_pQueueBuffer + m_nQueueLast) = *pMsg;
LeaveCriticalSection(&m_critMessage);
//you've got message...
return (::SetEvent(m_hMessage) != FALSE);
}
//-----------------------------------------------------------------------------
// (static) start a new thread
CQThreadX* CQThreadX::ThreadBegin()
{
CQThreadX* pQT = new CQThreadX;
CWinThread* pWT = AfxBeginThread( (AFX_THREADPROC)
_ThreadProc, pQT, THREAD_PRIORITY_NORMAL, 0, NULL);
return pQT;
}
/
******************************************************************************
* CQThreadX protected
interface *
******************************************************************************/
//-----------------------------------------------------------------------------
// protected constructor used by dynamic creation
CQThreadX::CQThreadX()
{
InitializeCriticalSection(&m_critMessage);
m_qExitCode = 0;
m_qExitCommand = QTM_QUIT;
m_hMessage = 0;
m_pMsgProc = NULL;
m_pMsgThis = NULL;
_MsgBufferStart();
}
//-----------------------------------------------------------------------------
// protected destructor
CQThreadX::~CQThreadX()
{
DeleteCriticalSection(&m_critMessage);
}
BEGIN_MESSAGE_MAP(CQThreadX, CWinThread)
//{{AFX_MSG_MAP(CQThreadX)
// NOTE - the ClassWizard will add and remove mapping macros here.
//}}AFX_MSG_MAP
END_MESSAGE_MAP()
/
******************************************************************************
* CQThreadX private
functions *
******************************************************************************/
//-----------------------------------------------------------------------------
// deflate queues buffer
bool CQThreadX::_MsgBufferDeflate()
{
if (m_nQueueSize < MSGBUFFER_INITIAL_SIZE)
return true;
// allocate new buffer
UINT nStackNew = (m_nQueueSize + MSGBUFFER_DEFLATE_SIZE);
UINT nBytesNew = sizeof(SQTrdMsg) * nStackNew;
SQTrdMsg* pQueueNew = (SQTrdMsg*) malloc(nBytesNew);
if (pQueueNew == NULL)
return false;
_MsgBufferRelocate(pQueueNew);
m_nQueueMax = nStackNew - 1;
return true;
}
//-----------------------------------------------------------------------------
// inflate queues buffer
bool CQThreadX::_MsgBufferInflate()
{
// allocate new buffer
UINT nStackNew = (m_nQueueMax + MSGBUFFER_INFLATE_SIZE);
UINT nBytesNew = sizeof(SQTrdMsg) * nStackNew;
SQTrdMsg* pQueueNew = (SQTrdMsg*) malloc(nBytesNew);
if (pQueueNew == NULL)
return false;
_MsgBufferRelocate(pQueueNew);
m_nQueueMax = nStackNew - 1;
return true;
}
//-----------------------------------------------------------------------------
// copy contents of old queue into new buffer
void CQThreadX::_MsgBufferRelocate(SQTrdMsg* pNewBuffer)
{
ASSERT(pNewBuffer != NULL);
if (m_nQueueSize != 0)
{
// copy contents of old buffer to new location
if (m_nQueueFirst < m_nQueueLast)
{
// currently used queue doesnt span over buffers end: normal copy
memcpy(pNewBuffer, m_pQueueBuffer + m_nQueueFirst,
m_nQueueSize * sizeof(SQTrdMsg));
}
else
{
// currently used queue does span over buffers end: swap at copy
UINT nCut = m_nQueueMax + 1 - m_nQueueFirst;
memcpy(pNewBuffer, m_pQueueBuffer + m_nQueueFirst,
nCut * sizeof(SQTrdMsg));
memcpy(pNewBuffer + nCut, m_pQueueBuffer,
(m_nQueueLast + 1) * sizeof(SQTrdMsg));
}
}
m_nQueueFirst = 0;
m_nQueueLast = m_nQueueSize - 1;
free(m_pQueueBuffer);
m_pQueueBuffer = pNewBuffer;
}
//-----------------------------------------------------------------------------
// initialize message buffer and start functionality
bool CQThreadX::_MsgBufferStart()
{
if (m_hMessage != 0)
return true;
m_hMessage = ::CreateEvent(NULL, TRUE, FALSE, "QTreadMsgHnd");
m_nQueueLast = MSGBUFFER_INITIAL_SIZE - 1;
m_nQueueMax = m_nQueueLast;
m_nQueueFirst = 0;
m_nQueueSize = 0;
m_pQueueBuffer = (SQTrdMsg*) malloc(sizeof(SQTrdMsg)
* MSGBUFFER_INITIAL_SIZE);
return (m_hMessage != 0);
}
//-----------------------------------------------------------------------------
// free all memory used by message buffer, stop message buffering
bool CQThreadX::_MsgBufferStop()
{
if (m_hMessage == 0)
return true;
EnterCriticalSection(&m_critMessage);
bool bRet = (::CloseHandle(m_hMessage) != FALSE);
m_hMessage = 0;
SQTrdMsg TM;
// remove all message-heads from queue, optionally deleting the body
while (_MsgBufferUnshift(&TM))
{
if ((TM.bFlags & QTMF_DELETEBODY) != 0)
delete TM.pBody;
}
// free queues still allocated memory
free(m_pQueueBuffer);
LeaveCriticalSection(&m_critMessage);
return bRet;
}
//-----------------------------------------------------------------------------
// unshift buffers oldest message from queue
bool CQThreadX::_MsgBufferUnshift(SQTrdMsg* pTM)
{
if (pTM == NULL)
return false;
EnterCriticalSection(&m_critMessage);
if (m_nQueueSize == 0)
{
LeaveCriticalSection(&m_critMessage);
return false;
}
// copy message from first slot to given buffer
*pTM = *(m_pQueueBuffer + m_nQueueFirst);
m_nQueueFirst++;
// start at beginning if first slot exceeds buffer
if (m_nQueueFirst > m_nQueueMax)
m_nQueueFirst = 0;
m_nQueueSize--;
// if size of buffer is too big, we shrink it
if (m_nQueueSize + MSGBUFFER_DEFLATE_THRESHOLD <= m_nQueueMax)
VERIFY(_MsgBufferDeflate());
LeaveCriticalSection(&m_critMessage);
return true;
}
//-----------------------------------------------------------------------------
// threads main message loop
QUAD CQThreadX::_MsgLoop()
{
// tell handling function were about to start
if (m_pMsgProc != NULL)
m_pMsgProc(m_pMsgThis, QTMEX_STARTING, this);
_MsgBufferStart();
// tell handling function we're now running
if (m_pMsgProc != NULL)
m_pMsgProc(m_pMsgThis, QTMEX_STARTED, this);
SQTrdMsg TM;
// wait until someone posts a message
while (::WaitForSingleObject(m_hMessage, INFINITE) != WAIT_TIMEOUT)
{
// empty message queue
while (_MsgBufferUnshift(&TM))
{
// call virtual function in derived class
if (OnMessage(TM))
{
// dispatch message to handling fn
if (m_pMsgProc != NULL)
m_pMsgProc(m_pMsgThis, QTMEX_MESSAGE, &TM);
}
// delete body if deletion-flag is set
if ((TM.bFlags & QTMF_DELETEBODY) != 0)
delete TM.pBody;
if (TM.qCommand == m_qExitCommand)
{
m_qExitCode = TM.qParam;
goto CQThreadX_LoopEnd;
}
}
::ResetEvent(m_hMessage);
}
CQThreadX_LoopEnd:
// tell handling function we're about to end
if (m_pMsgProc != NULL)
m_pMsgProc(m_pMsgThis, QTMEX_TERMINATING, this);
_MsgBufferStop();
// tell handling function we're finished
if (m_pMsgProc != NULL)
m_pMsgProc(m_pMsgThis, QTMEX_TERMINATED, this);
return m_qExitCode;
}
//-----------------------------------------------------------------------------
// (static) threads AFX_THREADPROC
UINT CQThreadX::_ThreadProc(LPVOID pParam)
{
CQThreadX* pThis = static_cast<CQThreadX*>(pParam);
_ASSERTE(pThis != NULL);
return (UINT) pThis->_MsgLoop();
}