L'IObserverT .NET era destinato alla sottoscrizione di più oggetti osservabili?

9

Ci sono IObservable e IObserver interfacce in .NET (anche qui e here ). È interessante notare che l'implementazione concreta di IObserver non contiene un riferimento diretto a IObservable. Non sa a chi è abbonato. Può solo invocare l'unsubscriber. "Si prega di tirare il perno per annullare l'iscrizione."

modifica: il unsubscriber implementa IDisposable . Penso che questo schema sia stato utilizzato per prevenire il problema del listener scaduto .

Però due cose non mi sono del tutto chiare.

  1. La classe Unsubscriber interna fornisce il comportamento subscribe-and-forget? Chi (e quando esattamente) chiama IDisposable.Dispose() sul Unsubscriber? Garbage Collector (GC) non è deterministico.
    [Disclaimer: nel complesso, ho trascorso più tempo con C e C ++ rispetto a C #.]
  2. Cosa dovrebbe succedere se voglio iscrivere un osservatore K ad un osservabile L1 e l'osservatore è già iscritto ad un altro L2 osservabile?

    K.Subscribe(L1);
    K.Subscribe(L2);
    K.Unsubscribe();
    L1.PublishObservation(1003);
    L2.PublishObservation(1004);
    

    Quando ho eseguito questo codice di test contro l'esempio di MSDN, l'osservatore è rimasto iscritto su L1. Questo sarebbe peculiare nello sviluppo reale. Potenzialmente, ci sono 3 strade per migliorare questo:

    • Se l'osservatore ha già un'istanza di annullamento dell'iscrizione (vale a dire è già iscritto), quindi annulla la sottoscrizione dal provider originale prima di abbonarsi a una nuova. Questo approccio nasconde il fatto che non è più sottoscritto al fornitore originale, che potrebbe diventare una sorpresa in seguito.
    • Se l'osservatore ha già un'istanza di unsubscriber, allora viene generata un'eccezione. Un codice chiamante ben educato deve annullare l'iscrizione esplicitamente all'osservatore.
    • Observer si iscrive a più provider. Questa è l'opzione più intrigante, ma può essere implementata con IObservable e IObserver? Vediamo. È possibile per l'osservatore tenere un elenco di oggetti non iscritti: uno per ciascuna fonte. Sfortunatamente, IObserver.OnComplete() non fornisce un riferimento al fornitore che l'ha inviato. Pertanto, l'implementazione IObserver con più provider non sarebbe in grado di determinare da quale disiscriversi.
  3. L'IObserver di .NET era destinato alla sottoscrizione a più IObservables?
    La definizione da manuale del modello di osservatore richiede che un osservatore debba essere in grado di abbonarsi a più fornitori? O è opzionale e dipende dall'implementazione?

posta Nick Alexeev 02.07.2014 - 18:49
fonte

4 risposte

5

Le due interfacce sono in realtà parte di Reactive Extensions (Rx in breve), dovresti usa quella libreria praticamente ogni volta che vuoi usarli.

Le interfacce sono tecnicamente in mscrolib, non in nessuno degli assembly Rx. Penso che questo faciliti l'interoperabilità: in questo modo, librerie come TPL Dataflow possono fornire members che funzionano con quelle interfacce , senza fare effettivamente riferimento a Rx.

Se utilizzi Subject di Rx come implementazione di IObservable , Subscribe sarà restituisce un IDisposable che può essere utilizzato per annullare la sottoscrizione:

var observable = new Subject<int>();

var unsubscriber =
    observable.Subscribe(Observer.Create<int>(i => Console.WriteLine("1: {0}", i)));
observable.Subscribe(Observer.Create<int>(i => Console.WriteLine("2: {0}", i)));

unsubscriber.Dispose();

observable.OnNext(1003);
observable.OnNext(1004);
    
risposta data 06.07.2014 - 11:34
fonte
5

Solo per chiarire alcune cose che sono ben documentate nel Rx Design Guidelines e infine sul mio sito web IntroToRx.com :

  • Non ti affidi al GC per ripulire le tue iscrizioni. Coperto in dettaglio qui
  • Non esiste un metodo Unsubscribe . Ti iscrivi ad una sequenza osservabile e ti viene dato un abbonamento . Puoi quindi disporre di tale abbonamento indicando che non desideri più richiamare i tuoi callback.
  • Una sequenza osservabile non può essere completata più di una volta (vedere la sezione 4 delle Linee guida sulla progettazione Rx).
  • Esistono numerosi modi per consumare più sequenze osservabili. C'è anche una grande quantità di informazioni su questo argomento su Reactivex.io e ancora su IntroToRx.

Per essere specifici e rispondere direttamente alla domanda originale, il tuo utilizzo è tornato in primo piano. Non spingere molte sequenze osservabili in un singolo osservatore. Si compongono sequenze osservabili in un'unica sequenza osservabile. Quindi ti iscrivi a quella singola sequenza.

Invece di

K.Subscribe(L1);
K.Subscribe(L2);
K.Unsubscribe();
L1.PublishObservation(1003);
L2.PublishObservation(1004);

Che è solo uno pseudo codice e non funzionerebbe con l'implementazione .NET di Rx, dovresti fare quanto segue:

var source1 = new Subject<int>(); //was L1
var source2 = new Subject<int>(); //was L2

var subscription = source1
    .Merge(source2)
    .Subscribe(value=>Console.WriteLine("OnNext({0})", value));


source1.OnNext(1003);
source2.OnNext(1004);

subscription.Dispose();

Ora questo non corrisponde esattamente alla domanda iniziale, ma non so cosa avrebbe dovuto fare K.Unsubscribe() (disiscriversi da tutti, l'ultimo o il primo abbonamento?!)

    
risposta data 19.05.2015 - 17:05
fonte
3

Hai ragione. L'esempio funziona male per più IObservables.

Suppongo che OnComplete () non fornisca un riferimento perché non vuole che l'IObservable debba tenerlo in giro. Se stavo scrivendo che probabilmente supporterò più abbonamenti facendo in modo che Subscribe sottoscriva un identificatore come secondo parametro, che viene restituito alla chiamata OnComplete (). Quindi potresti dire

K.Subscribe(L1,"L1")
K.Subscribe(L2,"L2")
K.Unsubscribe("L1")

Allo stato attuale, sembra che IObserver .NET non sia adatto per più osservatori. Ma suppongo che il tuo oggetto principale (LocationReporter nell'esempio) possa avere

public Dictionary<String,IObserver> Observers;

e questo ti consentirebbe di supportare

K.Subscribe(L1,"L1")
K.Subscribe(L2,"L2")
K.Unsubscribe("L1")

pure.

Suppongo che Microsoft potrebbe sostenere che quindi non è necessario per loro supportare direttamente più IObservables nelle interfacce.

    
risposta data 03.07.2014 - 21:16
fonte
0

So che questo è modo in ritardo per la festa, ma ...

Le interfacce I Observable<T> e IObserver<T> sono non parte di Rx ... sono tipi principali ... ma Rx ne fa un uso esteso.

Sei libero di avere tanti (o pochi) osservatori che vuoi. Se prevedi più osservatori, è responsabilità dell'osservatore indirizzare OnNext() chiamate agli osservatori appropriati per ogni evento osservato. L'osservabile potrebbe richiedere un elenco o un dizionario come suggerito.

Ci sono buoni casi per consentire solo uno - e buoni casi per consentire molti. Ad esempio, in un'implementazione CQRS / ES, è possibile imporre un singolo gestore di comandi per tipo comando su un bus di comando, mentre è possibile notificare più trasformazioni di lettura per un determinato evento digita nel negozio degli eventi.

Come indicato in altre risposte, non c'è Unsubscribe . Smaltire la cosa che ti viene data quando Subscribe generalmente fa il lavoro sporco. L'osservatore, o un suo agente, è responsabile della detenzione sul token fino a quando non desidera più ricevere ulteriori notifiche . (qestion 1)

Quindi, nel tuo esempio:

K.Subscribe(L1);
K.Subscribe(L2);
K.Unsubscribe();
L1.PublishObservation(1003);
L2.PublishObservation(1004);

... sarebbe più simile a:

using ( var l1Token = K.Subscribe( L1 ) )
{
  using ( var l2Token = K.Subscribe( L2 );
  {
    L1.PublishObservation( 1003 );
    L2.PublishObservation( 1004 );
  } //--> effectively unsubscribing to L2 here

  L2.PublishObservation( 1005 );
}

... dove K sentirà 1003 e 1004 ma non 1005.

Per me, questo sembra ancora divertente perché, nominalmente, gli abbonamenti sono cose di lunga durata ... spesso per la durata del programma. Non sono dissimili da questo rispetto ai normali eventi .Net.

In molti esempi che ho visto, il Dispose del token funziona per rimuovere l'osservatore dalla lista degli osservatori osservabili. Preferisco che il token non porti così tanta conoscenza in giro ... e quindi ho generalizzato i miei token di abbonamento per chiamare semplicemente un lambda passato (con informazioni identificative catturate al momento dell'iscrizione:

public class SubscriptionToken<T>: IDisposable
{
  private readonly Action unsubscribe;

  private SubscriptionToken( ) { }
  public SubscriptionToken( Action unsubscribe )
  {
    this.unsubscribe = unsubscribe;
  }

  public void Dispose( )
  {
    unsubscribe( );
  }
}

... e l'osservabile può installare il comportamento di annullamento dell'iscrizione durante la sottoscrizione:

IDisposable Subscribe<T>( IObserver<T> observer )
{
  var subscriberId = Guid.NewGuid( );
  subscribers.Add( subscriberId, observer );

  return new SubscriptionToken<T>
  (
    ( ) =>
    subscribers.Remove( subscriberId );
  );
}

Se il tuo osservatore sta intercettando eventi da più osservabili, dovresti assicurarti che ci sia qualche tipo di informazione di correlazione negli eventi stessi ... come gli eventi .Net fanno con sender . Dipende da te se questo è importante o meno. Non è cotto, come hai correttamente ragionato. (domanda 3)

    
risposta data 12.01.2016 - 14:54
fonte

Leggi altre domande sui tag