Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
415 views
in Technique[技术] by (71.8m points)

c++ - ReleaseSemaphore does not release the semaphore

(In short: main()'s WaitForSingleObject hangs in the program below).

I'm trying to write a piece of code that dispatches threads and waits for them to finish before it resumes. Instead of creating the threads every time, which is costly, I put them to sleep. The main thread creates X threads in CREATE_SUSPENDED state.

The synch is done with a semaphore with X as MaximumCount. The semaphore's counter is put down to zero and the threads are dispatched. The threds perform some silly loop and call ReleaseSemaphore before they go to sleep. Then the main thread uses WaitForSingleObject X times to be sure every thread finished its job and is sleeping. Then it loops and does it all again.

From time to time the program does not exit. When I beak the program I can see that WaitForSingleObject hangs. This means that a thread's ReleaseSemaphore did not work. Nothing is printf'ed so supposedly nothing went wrong.

Maybe two threads shouldn't call ReleaseSemaphore at the exact same time, but that would nullify the purpose of semaphores...

I just don't grok it...

Other solutions to synch threads are gratefully accepted!

#define TRY  100
#define LOOP 100

HANDLE *ids;
HANDLE semaphore;

DWORD WINAPI Count(__in LPVOID lpParameter)
{ 
 float x = 1.0f;   
 while(1)
 { 
  for (int i=1 ; i<LOOP ; i++)
   x = sqrt((float)i*x);
  while (ReleaseSemaphore(semaphore,1,NULL) == FALSE)
   printf(" ReleaseSemaphore error : %d ", GetLastError());
  SuspendThread(ids[(int) lpParameter]);
 }
 return (DWORD)(int)x;
}

int main()
{
 SYSTEM_INFO sysinfo;
 GetSystemInfo( &sysinfo );
 int numCPU = sysinfo.dwNumberOfProcessors;

 semaphore = CreateSemaphore(NULL, numCPU, numCPU, NULL);
 ids = new HANDLE[numCPU];

 for (int j=0 ; j<numCPU ; j++)
  ids[j] = CreateThread(NULL, 0, Count, (LPVOID)j, CREATE_SUSPENDED, NULL);

 for (int j=0 ; j<TRY ; j++)
 {
  for (int i=0 ; i<numCPU ; i++)
  {
   if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT)
    printf("Timed out !!!
");
   ResumeThread(ids[i]);  
  }
  for (int i=0 ; i<numCPU ; i++)
   WaitForSingleObject(semaphore,INFINITE);
  ReleaseSemaphore(semaphore,numCPU,NULL);
 }
 CloseHandle(semaphore);
 printf("Done
");
 getc(stdin);
}
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Instead of using a semaphore (at least directly) or having main explicitly wake up a thread to get some work done, I've always used a thread-safe queue. When main wants a worker thread to do something, it pushes a description of the job to be done onto the queue. The worker threads each just do a job, then try to pop another job from the queue, and end up suspended until there's a job in the queue for them to do:

The code for the queue looks like this:

#ifndef QUEUE_H_INCLUDED
#define QUEUE_H_INCLUDED

#include <windows.h>

template<class T, unsigned max = 256>
class queue { 
    HANDLE space_avail; // at least one slot empty
    HANDLE data_avail;  // at least one slot full
    CRITICAL_SECTION mutex; // protect buffer, in_pos, out_pos

    T buffer[max];
    long in_pos, out_pos;
public:
    queue() : in_pos(0), out_pos(0) { 
        space_avail = CreateSemaphore(NULL, max, max, NULL);
        data_avail = CreateSemaphore(NULL, 0, max, NULL);
        InitializeCriticalSection(&mutex);
    }

    void push(T data) { 
        WaitForSingleObject(space_avail, INFINITE);       
        EnterCriticalSection(&mutex);
        buffer[in_pos] = data;
        in_pos = (in_pos + 1) % max;
        LeaveCriticalSection(&mutex);
        ReleaseSemaphore(data_avail, 1, NULL);
    }

    T pop() { 
        WaitForSingleObject(data_avail,INFINITE);
        EnterCriticalSection(&mutex);
        T retval = buffer[out_pos];
        out_pos = (out_pos + 1) % max;
        LeaveCriticalSection(&mutex);
        ReleaseSemaphore(space_avail, 1, NULL);
        return retval;
    }

    ~queue() { 
        DeleteCriticalSection(&mutex);
        CloseHandle(data_avail);
        CloseHandle(space_avail);
    }
};

#endif

And a rough equivalent of your code in the threads to use it looks something like this. I didn't sort out exactly what your thread function was doing, but it was something with summing square roots, and apparently you're more interested in the thread synch than what the threads actually do, for the moment.

Edit: (based on comment): If you need main() to wait for some tasks to finish, do some more work, then assign more tasks, it's generally best to handle that by putting an event (for example) into each task, and have your thread function set the events. Revised code to do that would look like this (note that the queue code isn't affected):

#include "queue.hpp"

#include <iostream>
#include <process.h>
#include <math.h>
#include <vector>

struct task { 
    int val;
    HANDLE e;

    task() : e(CreateEvent(NULL, 0, 0, NULL)) { }
    task(int i) : val(i), e(CreateEvent(NULL, 0, 0, NULL)) {}
};

void process(void *p) { 
    queue<task> &q = *static_cast<queue<task> *>(p);

    task t;
    while ( -1 != (t=q.pop()).val) {
        std::cout << t.val << "
";
        SetEvent(t.e);
    }
}

int main() { 
    queue<task> jobs;

    enum { thread_count = 4 };
    enum { task_count = 10 };

    std::vector<HANDLE> threads;
    std::vector<HANDLE> events;

    std::cout << "Creating thread pool" << std::endl;
    for (int t=0; t<thread_count; ++t)
        threads.push_back((HANDLE)_beginthread(process, 0, &jobs));
    std::cout << "Thread pool Waiting" << std::endl;

    std::cout << "First round of tasks" << std::endl;

    for (int i=0; i<task_count; ++i) {
        task t(i+1);
        events.push_back(t.e);
        jobs.push(t);
    }

    WaitForMultipleObjects(events.size(), &events[0], TRUE, INFINITE);

    events.clear();

    std::cout << "Second round of tasks" << std::endl;

    for (int i=0; i<task_count; ++i) {
        task t(i+20);
        events.push_back(t.e);
        jobs.push(t);
    }

    WaitForMultipleObjects(events.size(), &events[0], true, INFINITE);
    events.clear();

    for (int j=0; j<thread_count; ++j)
        jobs.push(-1);

    WaitForMultipleObjects(threads.size(), &threads[0], TRUE, INFINITE);

    return 0;
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...