39 Stimmen

Implementierung von IObservable<T> von Grund auf

Die Reactive Extensions bieten viele Hilfsmethoden, um bestehende Ereignisse und asynchrone Operationen in Observables zu verwandeln, aber wie würde man ein IObservable<T> von Grund auf implementieren?

IEnumerable hat das schöne yield-Schlüsselwort, mit dem es sehr einfach zu implementieren ist.

Wie lässt sich IObservable<T> richtig implementieren?

Muss ich mir Sorgen um die Fadensicherheit machen?

Ich weiß, es gibt Unterstützung für immer zurück auf einen bestimmten Synchronisationskontext aufgerufen, aber ist dies etwas, das ich als IObservable<T> Autor brauchen, um zu kümmern, oder dies irgendwie eingebaut?

aktualisieren:

Hier ist meine C#-Version der F#-Lösung von Brian

using System;
using System.Linq;
using Microsoft.FSharp.Collections;

namespace Jesperll
{
    class Observable<T> : IObservable<T>, IDisposable where T : EventArgs
    {
        private FSharpMap<int, IObserver<T>> subscribers = 
                 FSharpMap<int, IObserver<T>>.Empty;
        private readonly object thisLock = new object();
        private int key;
        private bool isDisposed;

        public void Dispose()
        {
            Dispose(true);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (disposing && !isDisposed)
            {
                OnCompleted();
                isDisposed = true;
            }
        }

        protected void OnNext(T value)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnNext(value);
            }
        }

        protected void OnError(Exception exception)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            if (exception == null)
            {
                throw new ArgumentNullException("exception");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnError(exception);
            }
        }

        protected void OnCompleted()
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnCompleted();
            }
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            if (observer == null)
            {
                throw new ArgumentNullException("observer");
            }

            lock (thisLock)
            {
                int k = key++;
                subscribers = subscribers.Add(k, observer);
                return new AnonymousDisposable(() =>
                {
                    lock (thisLock)
                    {
                        subscribers = subscribers.Remove(k);
                    }
                });
            }
        }
    }

    class AnonymousDisposable : IDisposable
    {
        Action dispose;
        public AnonymousDisposable(Action dispose)
        {
            this.dispose = dispose;
        }

        public void Dispose()
        {
            dispose();
        }
    }
}

bearbeiten: Keine ObjectDisposedException auslösen, wenn Dispose zweimal aufgerufen wird

11voto

Colonel Panic Punkte 125419

El offizielle Dokumentation missbilligt, dass Benutzer IObservable selbst implementieren. Stattdessen wird von den Benutzern erwartet, dass sie die Fabrikmethode Observable.Create

Wenn möglich, implementieren Sie neue Operatoren, indem Sie bestehende Operatoren zusammensetzen. Ansonsten implementieren Sie benutzerdefinierte Operatoren mit Observable.Create

Observable.Create ist ein trivialer Wrapper um die interne Klasse von Reactive AnonymousObservable :

public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe)
{
    if (subscribe == null)
    {
        throw new ArgumentNullException("subscribe");
    }
    return new AnonymousObservable<TSource>(subscribe);
}

Ich weiß nicht, warum sie ihre Umsetzung nicht öffentlich gemacht haben, aber was soll's.

9voto

Brian Punkte 115257

Ehrlich gesagt, bin ich mir nicht sicher, wie "richtig" das alles ist, aber nach meinen bisherigen Erfahrungen fühlt es sich ziemlich gut an. Es ist F#-Code, aber hoffentlich bekommen Sie ein Gefühl für den Geschmack. Es können Sie "neue bis" eine Quelle Objekt, das Sie dann aufrufen können Next/Completed/Fehler auf, und es verwaltet Abonnements und versucht, Assert, wenn die Quelle oder Clients schlechte Dinge tun.

type ObservableSource<'T>() =     // '
    let protect f =
        let mutable ok = false
        try 
            f()
            ok <- true
        finally
            Debug.Assert(ok, "IObserver methods must not throw!")
            // TODO crash?
    let mutable key = 0
    // Why a Map and not a Dictionary?  Someone's OnNext() may unsubscribe, so we need threadsafe 'snapshots' of subscribers to Seq.iter over
    let mutable subscriptions = Map.empty : Map<int,IObserver<'T>>  // '
    let next(x) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnNext(x)))
    let completed() = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnCompleted()))
    let error(e) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnError(e)))
    let thisLock = new obj()
    let obs = 
        { new IObservable<'T> with       // '
            member this.Subscribe(o) =
                let k =
                    lock thisLock (fun () ->
                        let k = key
                        key <- key + 1
                        subscriptions <- subscriptions.Add(k, o)
                        k)
                { new IDisposable with 
                    member this.Dispose() = 
                        lock thisLock (fun () -> 
                            subscriptions <- subscriptions.Remove(k)) } }
    let mutable finished = false
    // The methods below are not thread-safe; the source ought not call these methods concurrently
    member this.Next(x) =
        Debug.Assert(not finished, "IObserver is already finished")
        next x
    member this.Completed() =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        completed()
    member this.Error(e) =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        error e
    // The object returned here is threadsafe; you can subscribe and unsubscribe (Dispose) concurrently from multiple threads
    member this.Value = obs

Ich bin an jeder Meinung darüber interessiert, was hier gut oder schlecht ist; ich hatte noch keine Gelegenheit, mir all das neue Rx-Zeug aus den Entwicklungslabors anzuschauen...

Meine eigenen Erfahrungen legen das nahe:

  • Wer Observablen abonniert hat, sollte niemals aus den Abonnements aussteigen. Es gibt nichts Vernünftiges, was eine Observable tun kann, wenn ein Abonnent wirft. (Das ist ähnlich wie bei Ereignissen.) Höchstwahrscheinlich wird die Exception einfach zu einem Top-Level-Catch-All-Handler aufsteigen oder die App zum Absturz bringen.
  • Die Quellen sollten wahrscheinlich "logisch single threaded" sein. Ich denke, es könnte schwieriger sein, Clients zu schreiben, die auf gleichzeitige OnNext-Aufrufe reagieren können; selbst wenn jeder einzelne Aufruf von einem anderen Thread kommt, ist es hilfreich, gleichzeitige Aufrufe zu vermeiden.
  • Es ist definitiv nützlich, eine Basis-/Helferklasse zu haben, die einige "Verträge" durchsetzt.

Ich bin sehr neugierig, ob jemand konkretere Ratschläge in dieser Richtung geben kann.

7voto

lightw8 Punkte 3224

Ja, das yield-Schlüsselwort ist reizvoll; vielleicht wird es etwas Ähnliches für IObservable(OfT) geben? [Bearbeiten: In Eric Meijer's PDC '09 Gespräch er sagt "yes, watch this space" zu einem deklarativen Ertrag für die Erzeugung von Observablen].

Wenn Sie etwas Ähnliches suchen (anstatt selbst zu rollen), schauen Sie sich Folgendes an der Boden der " (noch nicht) 101 Rx-Proben Wiki", wo das Team vorschlägt, die Klasse Subject(T) als "Backend" für die Implementierung eines IObservable(OfT) zu verwenden. Hier ist ihr Beispiel:

public class Order
{            
    private DateTime? _paidDate;

    private readonly Subject<Order> _paidSubj = new Subject<Order>();
    public IObservable<Order> Paid { get { return _paidSubj.AsObservable(); } }

    public void MarkPaid(DateTime paidDate)
    {
        _paidDate = paidDate;                
        _paidSubj.OnNext(this); // Raise PAID event
    }
}

private static void Main()
{
    var order = new Order();
    order.Paid.Subscribe(_ => Console.WriteLine("Paid")); // Subscribe

    order.MarkPaid(DateTime.Now);
}

2voto

Benjol Punkte 60397
  1. Öffnen Sie Reflector und sehen Sie es sich an.

  2. Sehen Sie sich einige C9-Videos an - este eine zeigt, wie man den Select-'Kombinator' 'ableiten' kann

  3. Das Geheimnis besteht darin, AnonymousObservable-, AnonymousObserver- und AnonymousDisposable-Klassen zu erstellen (die nur eine Umgehung für die Tatsache darstellen, dass man keine Schnittstellen instanziieren kann). Sie enthalten keine Implementierung, da Sie diese mit Actions und Funcs übergeben.

Zum Beispiel:

public class AnonymousObservable<T> : IObservable<T>
{
    private Func<IObserver<T>, IDisposable> _subscribe;
    public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe)
    {
        _subscribe = subscribe;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _subscribe(observer);
    }
}

Den Rest überlasse ich Ihnen... es ist eine sehr gute Übung, um zu verstehen.

Es gibt einen netten kleinen Thread, der wächst aquí mit entsprechenden Fragen.

2voto

Adiel Yaacov Punkte 1361

Nur eine Bemerkung zu dieser Umsetzung:

nach der Einführung von Concurrent Collections in .net fw 4 ist es wahrscheinlich besser, ConcurrentDictioary anstelle eines einfachen Wörterbuchs zu verwenden.

spart die Handhabung von Sperren für die Sammlung.

adi.

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X