2 Stimmen

ReleaseSemaphore gibt die Semaphore nicht frei

(Kurz gesagt: WaitForSingleObject von main() hängt sich im untenstehenden Programm auf).

Ich versuche, ein Codestück zu schreiben, das Threads verteilt und wartet, bis sie beendet sind, bevor es weitergeht. Anstatt die Threads jedes Mal neu zu erstellen, was kostspielig ist, versetze ich sie in den Ruhezustand. Der Hauptthread erstellt X Threads im Zustand CREATE_SUSPENDED.

Die Synchronisierung erfolgt über eine Semaphore mit X als MaximumCount. Der Zähler der Semaphore wird auf Null zurückgesetzt und die Threads werden abgearbeitet. Die drei Threads führen eine dumme Schleife aus und rufen ReleaseSemaphore auf, bevor sie schlafen gehen. Dann verwendet der Hauptthread WaitForSingleObject X-mal, um sicher zu sein, daß jeder Thread seine Aufgabe erledigt hat und schläft. Dann macht er eine Schleife und das Ganze noch einmal.

Von Zeit zu Zeit wird das Programm nicht beendet. Wenn ich das Programm beobachte, kann ich sehen, dass sich WaitForSingleObject aufhängt. Das bedeutet, dass die ReleaseSemaphore eines Threads nicht funktioniert hat. Es wird nichts gedruckt, also ist anscheinend nichts schiefgegangen.

Vielleicht sollten zwei Threads nicht genau zur gleichen Zeit ReleaseSemaphore aufrufen, aber das würde den Zweck von Semaphoren zunichte machen...

Ich verstehe es einfach nicht...

Andere Lösungen für Synchronisierungsfäden werden dankbar angenommen!

#define TRY  100
#define LOOP 100

HANDLE *ids;
HANDLE semaphore;

DWORD WINAPI Count(__in LPVOID lpParameter)
{ 
 float x = 1.0f;   
 while(1)
 { 
  for (int i=1 ; i<LOOP ; i++)
   x = sqrt((float)i*x);
  while (ReleaseSemaphore(semaphore,1,NULL) == FALSE)
   printf(" ReleaseSemaphore error : %d ", GetLastError());
  SuspendThread(ids[(int) lpParameter]);
 }
 return (DWORD)(int)x;
}

int main()
{
 SYSTEM_INFO sysinfo;
 GetSystemInfo( &sysinfo );
 int numCPU = sysinfo.dwNumberOfProcessors;

 semaphore = CreateSemaphore(NULL, numCPU, numCPU, NULL);
 ids = new HANDLE[numCPU];

 for (int j=0 ; j<numCPU ; j++)
  ids[j] = CreateThread(NULL, 0, Count, (LPVOID)j, CREATE_SUSPENDED, NULL);

 for (int j=0 ; j<TRY ; j++)
 {
  for (int i=0 ; i<numCPU ; i++)
  {
   if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT)
    printf("Timed out !!!\n");
   ResumeThread(ids[i]);  
  }
  for (int i=0 ; i<numCPU ; i++)
   WaitForSingleObject(semaphore,INFINITE);
  ReleaseSemaphore(semaphore,numCPU,NULL);
 }
 CloseHandle(semaphore);
 printf("Done\n");
 getc(stdin);
}

5voto

Jerry Coffin Punkte 452852

Anstatt eine Semaphore (zumindest direkt) zu verwenden oder main explizit einen Thread aufwecken zu lassen, um etwas zu erledigen, habe ich immer eine thread-sichere Warteschlange verwendet. Wenn main möchte, dass ein Worker-Thread etwas tut, schiebt er eine Beschreibung der zu erledigenden Aufgabe in die Warteschlange. Die Worker-Threads erledigen jeweils nur eine Aufgabe, versuchen dann, eine andere Aufgabe aus der Warteschlange zu holen, und werden so lange angehalten, bis sich eine Aufgabe in der Warteschlange befindet, die sie erledigen können:

Der Code für die Warteschlange sieht wie folgt aus:

#ifndef QUEUE_H_INCLUDED
#define QUEUE_H_INCLUDED

#include <windows.h>

template<class T, unsigned max = 256>
class queue { 
    HANDLE space_avail; // at least one slot empty
    HANDLE data_avail;  // at least one slot full
    CRITICAL_SECTION mutex; // protect buffer, in_pos, out_pos

    T buffer[max];
    long in_pos, out_pos;
public:
    queue() : in_pos(0), out_pos(0) { 
        space_avail = CreateSemaphore(NULL, max, max, NULL);
        data_avail = CreateSemaphore(NULL, 0, max, NULL);
        InitializeCriticalSection(&mutex);
    }

    void push(T data) { 
        WaitForSingleObject(space_avail, INFINITE);       
        EnterCriticalSection(&mutex);
        buffer[in_pos] = data;
        in_pos = (in_pos + 1) % max;
        LeaveCriticalSection(&mutex);
        ReleaseSemaphore(data_avail, 1, NULL);
    }

    T pop() { 
        WaitForSingleObject(data_avail,INFINITE);
        EnterCriticalSection(&mutex);
        T retval = buffer[out_pos];
        out_pos = (out_pos + 1) % max;
        LeaveCriticalSection(&mutex);
        ReleaseSemaphore(space_avail, 1, NULL);
        return retval;
    }

    ~queue() { 
        DeleteCriticalSection(&mutex);
        CloseHandle(data_avail);
        CloseHandle(space_avail);
    }
};

#endif

Und ein grobes Äquivalent Ihres Codes in den Threads, um ihn zu verwenden, sieht etwa so aus. Ich habe nicht genau herausgefunden, was Ihre Thread-Funktion getan hat, aber es war irgendetwas mit der Summierung von Quadratwurzeln, und anscheinend sind Sie im Moment mehr an der Thread-Synchronisation interessiert als an dem, was die Threads tatsächlich tun.

Edit: (aufgrund eines Kommentars): Wenn Sie brauchen main() zu warten, bis einige Aufgaben beendet sind, weitere Arbeit zu erledigen und dann weitere Aufgaben zuzuweisen, ist es im Allgemeinen am besten, dies zu handhaben, indem man ein Ereignis (z. B.) in jede Aufgabe einfügt und die Ereignisse von der Thread-Funktion setzen lässt. Überarbeiteter Code, um das zu tun würde wie folgt aussehen (beachten Sie, dass die Warteschlange Code nicht betroffen ist):

#include "queue.hpp"

#include <iostream>
#include <process.h>
#include <math.h>
#include <vector>

struct task { 
    int val;
    HANDLE e;

    task() : e(CreateEvent(NULL, 0, 0, NULL)) { }
    task(int i) : val(i), e(CreateEvent(NULL, 0, 0, NULL)) {}
};

void process(void *p) { 
    queue<task> &q = *static_cast<queue<task> *>(p);

    task t;
    while ( -1 != (t=q.pop()).val) {
        std::cout << t.val << "\n";
        SetEvent(t.e);
    }
}

int main() { 
    queue<task> jobs;

    enum { thread_count = 4 };
    enum { task_count = 10 };

    std::vector<HANDLE> threads;
    std::vector<HANDLE> events;

    std::cout << "Creating thread pool" << std::endl;
    for (int t=0; t<thread_count; ++t)
        threads.push_back((HANDLE)_beginthread(process, 0, &jobs));
    std::cout << "Thread pool Waiting" << std::endl;

    std::cout << "First round of tasks" << std::endl;

    for (int i=0; i<task_count; ++i) {
        task t(i+1);
        events.push_back(t.e);
        jobs.push(t);
    }

    WaitForMultipleObjects(events.size(), &events[0], TRUE, INFINITE);

    events.clear();

    std::cout << "Second round of tasks" << std::endl;

    for (int i=0; i<task_count; ++i) {
        task t(i+20);
        events.push_back(t.e);
        jobs.push(t);
    }

    WaitForMultipleObjects(events.size(), &events[0], true, INFINITE);
    events.clear();

    for (int j=0; j<thread_count; ++j)
        jobs.push(-1);

    WaitForMultipleObjects(threads.size(), &threads[0], TRUE, INFINITE);

    return 0;
}

3voto

Hans Passant Punkte 894572

Ich verstehe den Code nicht, aber die Threading-Synchronisation ist definitiv schlecht. Sie gehen davon aus, dass die Threads SuspendThread() in einer bestimmten Reihenfolge aufrufen. Ein erfolgreicher WaitForSingleObject()-Aufruf sagt Ihnen nicht die Thread ReleaseSemaphore() aufgerufen. Sie rufen also ReleaseThread() auf einem Thread auf, der nicht angehalten wurde. Dies führt schnell zu einer Blockierung des Programms.

Eine weitere schlechte Annahme ist, dass ein Thread bereits SuspendThread aufgerufen hat, nachdem der WFSO zurückgekehrt ist. Normalerweise ja, aber nicht immer. Der Thread könnte direkt nach dem RS-Aufruf pre-empted werden. Sie werden ReleaseThread() erneut auf einem Thread aufrufen, der nicht suspendiert wurde. Das dauert normalerweise einen Tag oder so, um Ihr Programm zu blockieren.

Und ich denke, es ist ein ReleaseSemaphore-Anruf zu viel. Versuchen Sie, es zu entwirren, kein Zweifel.

Sie können das Threading nicht mit Suspend/ReleaseThread() kontrollieren, versuchen Sie es nicht.

3voto

stmax Punkte 6476

Das Problem tritt im folgenden Fall auf:

nimmt der Hauptthread die Arbeitsthreads wieder auf:

  for (int i=0 ; i<numCPU ; i++)
  {
   if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT)
    printf("Timed out !!!\n");
   ResumeThread(ids[i]);  
  }

erledigen die Arbeits-Threads ihre Arbeit und geben die Semaphore frei:

  for (int i=1 ; i<LOOP ; i++)
   x = sqrt((float)i*x);
  while (ReleaseSemaphore(semaphore,1,NULL) == FALSE)

wartet der Hauptthread auf alle Worker-Threads und setzt die Semaphore zurück:

  for (int i=0 ; i<numCPU ; i++)
   WaitForSingleObject(semaphore,INFINITE);
  ReleaseSemaphore(semaphore,numCPU,NULL);

der Hauptthread geht in die nächste Runde und versucht, die Worker-Threads wieder aufzunehmen (beachten Sie, dass die Worker-Threads selbst noch nicht suspendiert sind! hier beginnt das Problem... Sie versuchen, Threads wieder aufzunehmen, die noch nicht unbedingt suspendiert sind):

  for (int i=0 ; i<numCPU ; i++)
  {
   if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT)
    printf("Timed out !!!\n");
   ResumeThread(ids[i]);  
  }

schließlich setzen sich die Worker-Threads selbst aus (obwohl sie bereits die nächste Runde beginnen sollten):

  SuspendThread(ids[(int) lpParameter]);

und der Hauptthread wartet ewig, da alle Worker jetzt ausgesetzt sind:

  for (int i=0 ; i<numCPU ; i++)
   WaitForSingleObject(semaphore,INFINITE);

Hier ist ein Link, der zeigt, wie man Erzeuger/Verbraucher-Probleme richtig löst:

http://en.wikipedia.org/wiki/Producer-consumer_problem

Ich denke auch, dass kritische Abschnitte sind viel schneller als Semaphoren und Mutexe. Sie sind in den meisten Fällen auch einfacher zu verstehen (imo).

0voto

Dingo Punkte 3255

Das Problem ist, dass Sie öfter warten als dass Sie Signale geben.

Le site for (int j=0 ; j<TRY ; j++) Schleife wartet achtmal auf die Semaphore, während die vier Threads jeweils nur einmal signalisieren und die Schleife selbst einmal. Beim ersten Durchlauf der Schleife ist dies kein Problem, da die Semaphore anfangs viermal gezählt wird. Beim zweiten und jedem weiteren Mal warten Sie auf zu viele Signale. Dies wird dadurch abgemildert, dass Sie bei den ersten vier Warteschleifen die Zeit begrenzen und im Fehlerfall nicht erneut versuchen. Manchmal klappt es also, und manchmal bleibt die Wartezeit stehen.

Ich denke, die folgenden (nicht getesteten) Änderungen werden helfen.

Initialisieren Sie die Semaphore auf Nullzählung:

semaphore = CreateSemaphore(NULL, 0, numCPU, NULL);

Beseitigen Sie das Warten in der Schleife zur Wiederaufnahme des Fadens (d. h. entfernen Sie Folgendes):

   if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT)  
      printf("Timed out !!!\n");  

Entfernen Sie das Fremdsignal am Ende der Testschleife (d. h. entfernen Sie Folgendes):

ReleaseSemaphore(semaphore,numCPU,NULL);

0voto

Gabriel Punkte 2527

Hier ist eine praktische Lösung.

Ich wollte, dass mein Hauptprogramm Threads verwendet (und dann mehr als einen Kern verwendet), um Aufträge zu mampfen und zu warten, bis alle Threads abgeschlossen sind, bevor ich fortfahre und andere Dinge tue. Ich wollte nicht, dass die Threads sterben und neue erstellt werden, weil das zu langsam ist. In meiner Frage habe ich versucht, dies zu erreichen, indem ich die Threads angehalten habe, was natürlich erschien. Aber wie nobugz schon sagte: "Du kannst Threads mit Suspend/ReleaseThread() kontrollieren".

Die Lösung beinhaltet Semaphoren wie die, die ich zur Steuerung der Threads verwendet habe. Eigentlich wird eine weitere Semaphore verwendet, um den Hauptthread zu steuern. Jetzt habe ich eine Semaphore pro Thread zur Steuerung der Threads und eine Semaphore zur Steuerung des Hauptthreads.

Hier ist die Lösung:

#include <windows.h>
#include <stdio.h>
#include <math.h>
#include <process.h>

#define TRY  500000
#define LOOP 100

HANDLE *ids;
HANDLE *semaphores;
HANDLE allThreadsSemaphore;

DWORD WINAPI Count(__in LPVOID lpParameter)
{   
    float x = 1.0f;         
    while(1)
    {   
        WaitForSingleObject(semaphores[(int)lpParameter],INFINITE);
        for (int i=1 ; i<LOOP ; i++)
            x = sqrt((float)i*x+rand());
        ReleaseSemaphore(allThreadsSemaphore,1,NULL);
    }
    return (DWORD)(int)x;
}

int main()
{
    SYSTEM_INFO sysinfo;
    GetSystemInfo( &sysinfo );
    int numCPU = sysinfo.dwNumberOfProcessors;

    ids = new HANDLE[numCPU];
    semaphores = new HANDLE[numCPU]; 

    for (int j=0 ; j<numCPU ; j++)
    {
        ids[j] = CreateThread(NULL, 0, Count, (LPVOID)j, NULL, NULL);
        // Threads blocked until main releases them one by one
        semaphores[j] = CreateSemaphore(NULL, 0, 1, NULL);
    }
    // Blocks main until threads finish
    allThreadsSemaphore = CreateSemaphore(NULL, 0, numCPU, NULL);

    for (int j=0 ; j<TRY ; j++)
    {
        for (int i=0 ; i<numCPU ; i++) // Let numCPU threads do their jobs
            ReleaseSemaphore(semaphores[i],1,NULL);
        for (int i=0 ; i<numCPU ; i++) // wait for numCPU threads to finish
            WaitForSingleObject(allThreadsSemaphore,INFINITE);
    }
    for (int j=0 ; j<numCPU ; j++)
        CloseHandle(semaphores[j]);
    CloseHandle(allThreadsSemaphore);
    printf("Done\n");
    getc(stdin);
}

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X