3 Stimmen

Wie implementiere ich eine pausierbare BlockingCollection

Ich schreibe einen WCF-Dienst, der Benachrichtigungen von mehreren Modulen (Datenbank, anderen Diensten...) empfängt und sie einer blockierenden Sammlung hinzufügt, um auf einem Verbraucherthread verarbeitet zu werden, der die relevanten Daten an die Clients veröffentlicht.

Ein Client kann die auf dem Server gespeicherten vollständigen Daten anfordern, und während dieser Operation möchte ich keine neuen Benachrichtigungen akzeptieren. Grundsätzlich möchte ich die blockierende Sammlung (oder den Verbraucherthread) aussetzen und die Benachrichtigungsaufnahme und -verarbeitung nach Abschluss der Anfrage des Clients fortsetzen.

Was ist ein guter Weg, dieses Verhalten zu implementieren?

6voto

Chris Taylor Punkte 50950

Wenn ich dich richtig verstanden habe, möchtest du verhindern, dass der Verbraucherthread Daten aus der BlockingCollection konsumiert, während eine andere Abfrageoperation stattfindet, aber während dieser Zeit können die Produzenten weiterhin Daten in die Sammlung einfügen.

Wenn das korrekt ist, denke ich, dass der beste Weg wäre, ein ManualResetEvent zu haben, das normalerweise signalisiert wird und die Verbraucherthreads nicht blockiert, und wenn du die Verbraucher pausieren möchtest, kannst du das Ereignis zurücksetzen, was dazu führt, dass jeder Verbraucher auf das Signal wartet, bevor es signalisiert wird.

Aktualisierung: Hier ist eine schnelle Konsolenanwendung, die das oben Beschriebene zeigt. Dies ist nur eine schnelle Demo, aber zeigt einen Produzententhread und zwei Verbraucherthreads. Der Zustand der Verbraucher kann mit der Leertaste auf der Tastatur zwischen Laufend und Angehalten umgeschaltet werden.

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Threading;

namespace ProducerConsumerDemo
{
  class Program
  {
    static BlockingCollection _queue = new BlockingCollection();
    static ManualResetEvent _pauseConsumers = new ManualResetEvent(true);
    static bool _paused = false;
    static int _itemsEnqueued = 0;
    static int _itemsDequeued = 0;

    static void Main(string[] args)
    {
      Thread producerThread = new Thread(Producer);
      Thread consumerThread1 = new Thread(Consumer);
      Thread consumerThread2 = new Thread(Consumer);
      producerThread.Start();
      consumerThread1.Start();
      consumerThread2.Start();

      while (true)
      {
        WriteAt(0,0,"Zustand: " + (string)(_paused ? "Angehalten" : "Laufend"));
        WriteAt(0,1,"Elemente in der Warteschlange: " + _queue.Count);
        WriteAt(0, 2, "Insgesamt hinzugefügt: " + _itemsEnqueued);
        WriteAt(0, 3, "Insgesamt entnommen: " + _itemsDequeued);

        Thread.Sleep(100);
        if (Console.KeyAvailable)
        {
          if (Console.ReadKey().Key == ConsoleKey.Spacebar)
          {
            if (_paused)
            {
              _paused = false;
              _pauseConsumers.Set();
            }
            else
            {
              _paused = true;
              _pauseConsumers.Reset();
            }
          }
        }
      }
    }

    static void WriteAt(int x, int y, string format, params object[] args)
    {
      Console.SetCursorPosition(x, y);
      Console.Write("                                         ");
      Console.SetCursorPosition(x, y);
      Console.Write(format, args);
    }

    static void Consumer()
    {
      while (true)
      {
        if (_paused)
        {
          // Wenn wir angehalten sind, warte auf das Signal, um anzuzeigen, dass wir fortfahren können
          _pauseConsumers.WaitOne();
        }

        int value;
        if (_queue.TryTake(out value))
        {
          Interlocked.Increment(ref _itemsDequeued);
          // Etwas mit den Daten machen
        }
        Thread.Sleep(500);
      }
    }

    static void Producer()
    {
      Random rnd = new Random();
      while (true)
      {
        if (_queue.TryAdd(rnd.Next(100)))
        {
          Interlocked.Increment(ref _itemsEnqueued);
        }
        Thread.Sleep(500);
      }
    }
  }
}

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X