aliakseis (aliakseis) wrote,
aliakseis
aliakseis

Opus #5

class CThreadFifoQueue
{
    HANDLE m_guard;
    HANDLE m_notify[2];
    HANDLE m_notificationFinished;
    
    volatile long m_waitCount;
    volatile long m_lockCount;
    volatile bool m_fEvenGeneration;
    
    volatile long m_ticket;
    
public:
    enum { QUEUE_SIZE = 10 };
    
    CThreadFifoQueue()
        : m_guard(CreateMutex(NULL, FALSE, NULL))
        , m_notificationFinished(CreateEvent(NULL, FALSE, FALSE, NULL))	
        , m_waitCount(0)
        , m_lockCount(0)
        , m_fEvenGeneration(0)
        , m_ticket(0)
    {
        m_notify[0] = CreateEvent(NULL, TRUE, FALSE, NULL);
        m_notify[1] = CreateEvent(NULL, TRUE, FALSE, NULL);
    }
    
    ~CThreadFifoQueue()
    {
        CloseHandle(m_guard);
        CloseHandle(m_notify[0]);
        CloseHandle(m_notify[1]);
        CloseHandle(m_notificationFinished);
    }
    
    void Wait(HANDLE hStopEvent);
};

void CThreadFifoQueue::Wait(HANDLE hStopEvent)
{
    // Notifying begins
    VERIFY(WAIT_OBJECT_0 == WaitForSingleObject (m_guard, INFINITE)); // acquire
    
    long ticket = m_ticket++;
    
    ASSERT(0 == m_lockCount);
    if (m_waitCount != 0)
    {
        m_lockCount = m_waitCount;
        m_fEvenGeneration = !m_fEvenGeneration;
        
        // Fire notification and let other threads handle it
        // Note that mutex guards notification session
        VERIFY(SetEvent(m_notify[m_fEvenGeneration]));
        
        // wait till notification handled and acquire
        VERIFY(WAIT_OBJECT_0 == WaitForSingleObject(m_notificationFinished, INFINITE)); 
    }
    
    m_waitCount++; // "subscribe" for notification session
    bool fEvenGeneration = m_fEvenGeneration;
    
    VERIFY(ReleaseMutex(m_guard));
    
    bool bStop = false;
    
    // Waiting begins
    do
    {		
        fEvenGeneration = !fEvenGeneration;
        
        HANDLE events[2] = { m_notify[fEvenGeneration], hStopEvent };
        DWORD dwRes;
        while (WAIT_OBJECT_0 + 2 == (dwRes = MsgWaitForMultipleObjects(2, events, FALSE, INFINITE, QS_ALLINPUT)))
        {	// Handle messages if any
            MSG msg;
            while (PeekMessage(&msg, NULL, 0, 0, PM_REMOVE))
            {
                CWinThread* pThread = AfxGetThread();
                if (pThread == NULL || !pThread->PreTranslateMessage(&msg))
                {
                    TranslateMessage(&msg);
                    DispatchMessage(&msg);
                }
            }
        }
        
        if (WAIT_OBJECT_0 + 1 == dwRes)
        {	// Handle owner shutdown
            bStop = true;
            HANDLE objects[2] = { m_notify[fEvenGeneration], m_guard };
            
            if (WAIT_OBJECT_0 + 1 == WaitForMultipleObjects(2, objects, FALSE, INFINITE))
            {	// m_guard
                ASSERT(0 == m_lockCount);
                m_waitCount--;	// "un-subscribe" from notification session, already synchronized by mutex
                VERIFY(ReleaseMutex(m_guard));
                continue; // exit out of outer loop
            }
        }
        
        ASSERT(WAIT_TIMEOUT == WaitForSingleObject(m_guard, 0)); 
        
        if (m_ticket - ticket > QUEUE_SIZE)
        {
            ASSERT(m_ticket - ticket == QUEUE_SIZE + 1);
            bStop = true;
        }
        
        if (bStop)
            InterlockedDecrement((long*)&m_waitCount); // "un-subscribe" from notification session
        
        if (0 == InterlockedDecrement((long*)&m_lockCount))
        {
            VERIFY(ResetEvent(m_notify[fEvenGeneration]));
            VERIFY(SetEvent(m_notificationFinished));
        }
    }
    while (!bStop);
}
Subscribe
  • Post a new comment

    Error

    default userpic
    When you submit the form an invisible reCAPTCHA check will be performed.
    You must follow the Privacy Policy and Google Terms of use.
  • 1 comment