Die Reactive Extensions bieten viele Hilfsmethoden, um bestehende Ereignisse und asynchrone Operationen in Observables zu verwandeln, aber wie würde man ein IObservable<T> von Grund auf implementieren?
IEnumerable hat das schöne yield-Schlüsselwort, mit dem es sehr einfach zu implementieren ist.
Wie lässt sich IObservable<T> richtig implementieren?
Muss ich mir Sorgen um die Fadensicherheit machen?
Ich weiß, es gibt Unterstützung für immer zurück auf einen bestimmten Synchronisationskontext aufgerufen, aber ist dies etwas, das ich als IObservable<T> Autor brauchen, um zu kümmern, oder dies irgendwie eingebaut?
aktualisieren:
Hier ist meine C#-Version der F#-Lösung von Brian
using System;
using System.Linq;
using Microsoft.FSharp.Collections;
namespace Jesperll
{
class Observable<T> : IObservable<T>, IDisposable where T : EventArgs
{
private FSharpMap<int, IObserver<T>> subscribers =
FSharpMap<int, IObserver<T>>.Empty;
private readonly object thisLock = new object();
private int key;
private bool isDisposed;
public void Dispose()
{
Dispose(true);
}
protected virtual void Dispose(bool disposing)
{
if (disposing && !isDisposed)
{
OnCompleted();
isDisposed = true;
}
}
protected void OnNext(T value)
{
if (isDisposed)
{
throw new ObjectDisposedException("Observable<T>");
}
foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
{
observer.OnNext(value);
}
}
protected void OnError(Exception exception)
{
if (isDisposed)
{
throw new ObjectDisposedException("Observable<T>");
}
if (exception == null)
{
throw new ArgumentNullException("exception");
}
foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
{
observer.OnError(exception);
}
}
protected void OnCompleted()
{
if (isDisposed)
{
throw new ObjectDisposedException("Observable<T>");
}
foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
{
observer.OnCompleted();
}
}
public IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null)
{
throw new ArgumentNullException("observer");
}
lock (thisLock)
{
int k = key++;
subscribers = subscribers.Add(k, observer);
return new AnonymousDisposable(() =>
{
lock (thisLock)
{
subscribers = subscribers.Remove(k);
}
});
}
}
}
class AnonymousDisposable : IDisposable
{
Action dispose;
public AnonymousDisposable(Action dispose)
{
this.dispose = dispose;
}
public void Dispose()
{
dispose();
}
}
}
bearbeiten: Keine ObjectDisposedException auslösen, wenn Dispose zweimal aufgerufen wird