Hier sind zwei praktische Operatoren zur Aufhebung beobachtbarer Sequenzen. Der Unterschied zwischen ihnen besteht darin, was im Falle einer Annullierung geschieht. Die TakeUntil
bewirkt eine normale Beendigung der Sequenz ( OnCompleted
), während die WithCancellation
führt zu einer außerordentlichen Beendigung ( OnError
).
/// <summary>Returns the elements from the source observable sequence until the
/// CancellationToken is canceled.</summary>
public static IObservable<TSource> TakeUntil<TSource>(
this IObservable<TSource> source, CancellationToken cancellationToken)
{
return source
.TakeUntil(Observable.Create<Unit>(observer =>
cancellationToken.Register(() => observer.OnNext(default))));
}
/// <summary>Ties a CancellationToken to an observable sequence. In case of
/// cancellation propagates an OperationCanceledException to the observer.</summary>
public static IObservable<TSource> WithCancellation<TSource>(
this IObservable<TSource> source, CancellationToken cancellationToken)
{
return source
.TakeUntil(Observable.Create<Unit>(o => cancellationToken.Register(() =>
o.OnError(new OperationCanceledException(cancellationToken)))));
}
Beispiel für die Verwendung:
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var pulses = Observable
.Generate(0, i => true, i => i + 1, i => i, i => TimeSpan.FromMilliseconds(500))
.WithCancellation(cts.Token);
Anmerkung: Im Falle einer Stornierung melden sich die oben vorgestellten benutzerdefinierten Operatoren sofort von der zugrunde liegenden Beobachtungsgröße ab. Dies ist zu bedenken, wenn die Observable Seiteneffekte enthält. Das Einfügen der TakeUntil(cts.Token)
vor dem Operator, der die Nebeneffekte ausführt, wird die Fertigstellung der gesamten Beobachtungsgröße aufgeschoben, bis die Nebeneffekte abgeschlossen sind ( ordnungsgemäße Beendigung ). Wird sie nach den Seiteneffekten eingefügt, erfolgt die Annullierung sofort, was dazu führen kann, dass laufender Code unbeobachtet weiterläuft, sozusagen "fire-and-forget".