10 Stimmen

Wie man eine beobachtbare Sequenz löscht

Ich habe eine sehr einfache IObservable<int> der alle 500ms als Impulsgeber fungiert:

var pulses = Observable.GenerateWithTime(0, i => true, i => i + 1, i => i,
                                         i => TimeSpan.FromMilliseconds(500))

Und ich habe eine CancellationTokenSource (die verwendet wird, um andere Arbeiten, die gleichzeitig laufen, zu stornieren).

Wie kann ich die Storno-Token-Quelle verwenden, um meine beobachtbare Sequenz abzubrechen?

12voto

Natan Punkte 4526

Es handelt sich zwar um ein altes Thema, aber für die Zukunft finden Sie hier eine einfachere Methode.

Wenn Sie einen CancellationToken haben, arbeiten Sie wahrscheinlich bereits mit Aufgaben. Konvertieren Sie ihn also einfach in einen Task und lassen Sie das Framework die Bindung vornehmen:

using System.Reactive.Threading.Tasks;
...
var task = myObservable.ToTask(cancellationToken);

Dadurch wird ein interner Abonnent erstellt, der entsorgt wird, wenn die Aufgabe abgebrochen wird. Dies ist in den meisten Fällen ausreichend, da die meisten Observables nur Werte erzeugen, wenn es Abonnenten gibt.

Wenn Sie nun eine tatsächliche Observable haben, die aus irgendeinem Grund entsorgt werden muss (vielleicht eine Hot Observable, die nicht mehr wichtig ist, wenn eine übergeordnete Aufgabe abgebrochen wird), kann dies mit einer Fortsetzung erreicht werden:

disposableObservable.ToTask(cancellationToken).ContinueWith(t => {
    if (t.Status == TaskStatus.Canceled)
        disposableObservable.Dispose();
});

8voto

Jim Wooley Punkte 9900

Wenn Sie GenerateWithTime verwenden (jetzt ersetzt durch Generate mit Übergabe einer Zeitspannen-Funküberladung), können Sie den zweiten Parameter wie folgt ersetzen, um den Zustand des Stornierungs-Tokens zu evakuieren:

var pulses = Observable.Generate(0,
    i => !ts.IsCancellationRequested,
    i => i + 1,
    i => i,
    i => TimeSpan.FromMilliseconds(500));

Wenn Ihr Ereignis, das das Setzen des Storno-Tokens bewirkt, selbst in eine Beobachtungsgröße umgewandelt werden kann, könnten Sie alternativ etwas wie das Folgende verwenden:

pulses.TakeUntil(CancelRequested);

Eine ausführlichere Erklärung habe ich unter http://www.thinqlinq.com/Post.aspx/Title/Cancelling-a-Reactive-Extensions-Observable auch.

3voto

Theodor Zoulias Punkte 22972

Hier sind zwei praktische Operatoren zur Aufhebung beobachtbarer Sequenzen. Der Unterschied zwischen ihnen besteht darin, was im Falle einer Annullierung geschieht. Die TakeUntil bewirkt eine normale Beendigung der Sequenz ( OnCompleted ), während die WithCancellation führt zu einer außerordentlichen Beendigung ( OnError ).

/// <summary>Returns the elements from the source observable sequence until the
/// CancellationToken is canceled.</summary>
public static IObservable<TSource> TakeUntil<TSource>(
    this IObservable<TSource> source, CancellationToken cancellationToken)
{
    return source
        .TakeUntil(Observable.Create<Unit>(observer =>
            cancellationToken.Register(() => observer.OnNext(default))));
}

/// <summary>Ties a CancellationToken to an observable sequence. In case of
/// cancellation propagates an OperationCanceledException to the observer.</summary>
public static IObservable<TSource> WithCancellation<TSource>(
    this IObservable<TSource> source, CancellationToken cancellationToken)
{
    return source
        .TakeUntil(Observable.Create<Unit>(o => cancellationToken.Register(() =>
            o.OnError(new OperationCanceledException(cancellationToken)))));
}

Beispiel für die Verwendung:

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

var pulses = Observable
    .Generate(0, i => true, i => i + 1, i => i, i => TimeSpan.FromMilliseconds(500))
    .WithCancellation(cts.Token);

Anmerkung: Im Falle einer Stornierung melden sich die oben vorgestellten benutzerdefinierten Operatoren sofort von der zugrunde liegenden Beobachtungsgröße ab. Dies ist zu bedenken, wenn die Observable Seiteneffekte enthält. Das Einfügen der TakeUntil(cts.Token) vor dem Operator, der die Nebeneffekte ausführt, wird die Fertigstellung der gesamten Beobachtungsgröße aufgeschoben, bis die Nebeneffekte abgeschlossen sind ( ordnungsgemäße Beendigung ). Wird sie nach den Seiteneffekten eingefügt, erfolgt die Annullierung sofort, was dazu führen kann, dass laufender Code unbeobachtet weiterläuft, sozusagen "fire-and-forget".

2voto

StanislawSwierc Punkte 2441

Sie können Ihre IObservable Abonnement mit CancellationTokenSource mit folgendem Ausschnitt

var pulses = Observable.GenerateWithTime(0,
    i => true, i => i + 1, i => i,
    i => TimeSpan.FromMilliseconds(500));

// Get your CancellationTokenSource
CancellationTokenSource ts = ...

// Subscribe
ts.Token.Register(pulses.Subscribe(...).Dispose);

0voto

corvuscorax Punkte 5700

Sie erhalten eine IDisposable Instanz vom Abonnement zurück. Aufruf von Dispose() dazu.

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X