3 Stimmen

Reaktiver Rahmen (RX) und asynchroner Umgang mit Ereignissen

Ich spiele also nur mit RX herum und lerne es. Ich habe angefangen, mit Events zu spielen, und wollte wissen, wie man Events abonniert und die Ergebnisse asynchron in Batches verarbeitet. Erlauben Sie mir, das mit Code zu erklären:

Einfache Klasse, die Ereignisse auslöst:

public class EventRaisingClass
{
   public event EventHandler<SomeEventArgs> EventOccured;

   //some other code that raises event...
}

public class SomeEventArgs : EventArgs
{
    public SomeEventArgs(int data)
    {
        this.SomeArg = data;
    }

    public int SomeArg { get; private set; }
}

Dann mein Main:

public static void Main(string[] args)
{
    var eventRaiser = new EventRaisingClass();
    IObservable<IEvent<SomeEventArgs>> observable = 
        Observable.FromEvent<SomeEventArgs>(e => eventRaiser.EventOccured += e, e => eventRaiser.EventOccured -= e);

    IObservable<IList<IEvent<SomeEventArgs>>> bufferedEvents = observable.BufferWithCount(100);

    //how can I subscribte to bufferedEvents so that the subscription code gets called Async?
    bufferedEvents.Subscribe(list => /*do something with list of event args*/); //this happens synchrounously...

}

Wie Sie in meinen Kommentaren sehen können, wenn Sie nur subscribe wie das aufrufen, geschieht der gesamte Abonnement-Code synchron. Gibt es eine Möglichkeit, out of the box mit RX zu haben, die Subscribe auf verschiedenen Threads aufgerufen werden, wenn es eine neue Charge von Ereignissen zu arbeiten?

2voto

Stephen Cleary Punkte 402664

Ich glaube, Sie suchen nach SubscribeOn o ObserveOn und übergibt eine IScheduler . Es sind mehrere Scheduler eingebaut unter System.Concurrency Einige von ihnen verwenden den aktuellen Thread, andere wiederum verwenden spezielle Threads.

Dieses Video finden Sie weitere Informationen über das Konzept des Schedulers.

Das Rx-Team hat außerdem kürzlich eine Praktische Übungen Dokument, das im Moment einer Anleitung am nächsten kommt.

2voto

Sergey Aldoukhov Punkte 21696
bufferedEvents.ObserveOn(Scheduler.TaskPool).Subscribe(...

SubscribeOn ist die Angabe des Zeitplans, auf dem die so genannten " Abonnement-Nebenwirkungen " geschehen. Zum Beispiel kann Ihr Observable jedes Mal eine Datei öffnen, wenn sich jemand anmeldet.

ObserveOn legt den Zeitplan fest, nach dem der Aufruf des Beobachters jedes Mal erfolgt, wenn ein neuer Wert vorhanden ist. In der Praxis wird es häufiger verwendet als SubscribeOn.

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X