Wenn ich dich richtig verstanden habe, möchtest du verhindern, dass der Verbraucherthread Daten aus der BlockingCollection
konsumiert, während eine andere Abfrageoperation stattfindet, aber während dieser Zeit können die Produzenten weiterhin Daten in die Sammlung einfügen.
Wenn das korrekt ist, denke ich, dass der beste Weg wäre, ein ManualResetEvent
zu haben, das normalerweise signalisiert wird und die Verbraucherthreads nicht blockiert, und wenn du die Verbraucher pausieren möchtest, kannst du das Ereignis zurücksetzen, was dazu führt, dass jeder Verbraucher auf das Signal wartet, bevor es signalisiert wird.
Aktualisierung: Hier ist eine schnelle Konsolenanwendung, die das oben Beschriebene zeigt. Dies ist nur eine schnelle Demo, aber zeigt einen Produzententhread und zwei Verbraucherthreads. Der Zustand der Verbraucher kann mit der Leertaste
auf der Tastatur zwischen Laufend
und Angehalten
umgeschaltet werden.
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Threading;
namespace ProducerConsumerDemo
{
class Program
{
static BlockingCollection _queue = new BlockingCollection();
static ManualResetEvent _pauseConsumers = new ManualResetEvent(true);
static bool _paused = false;
static int _itemsEnqueued = 0;
static int _itemsDequeued = 0;
static void Main(string[] args)
{
Thread producerThread = new Thread(Producer);
Thread consumerThread1 = new Thread(Consumer);
Thread consumerThread2 = new Thread(Consumer);
producerThread.Start();
consumerThread1.Start();
consumerThread2.Start();
while (true)
{
WriteAt(0,0,"Zustand: " + (string)(_paused ? "Angehalten" : "Laufend"));
WriteAt(0,1,"Elemente in der Warteschlange: " + _queue.Count);
WriteAt(0, 2, "Insgesamt hinzugefügt: " + _itemsEnqueued);
WriteAt(0, 3, "Insgesamt entnommen: " + _itemsDequeued);
Thread.Sleep(100);
if (Console.KeyAvailable)
{
if (Console.ReadKey().Key == ConsoleKey.Spacebar)
{
if (_paused)
{
_paused = false;
_pauseConsumers.Set();
}
else
{
_paused = true;
_pauseConsumers.Reset();
}
}
}
}
}
static void WriteAt(int x, int y, string format, params object[] args)
{
Console.SetCursorPosition(x, y);
Console.Write(" ");
Console.SetCursorPosition(x, y);
Console.Write(format, args);
}
static void Consumer()
{
while (true)
{
if (_paused)
{
// Wenn wir angehalten sind, warte auf das Signal, um anzuzeigen, dass wir fortfahren können
_pauseConsumers.WaitOne();
}
int value;
if (_queue.TryTake(out value))
{
Interlocked.Increment(ref _itemsDequeued);
// Etwas mit den Daten machen
}
Thread.Sleep(500);
}
}
static void Producer()
{
Random rnd = new Random();
while (true)
{
if (_queue.TryAdd(rnd.Next(100)))
{
Interlocked.Increment(ref _itemsEnqueued);
}
Thread.Sleep(500);
}
}
}
}