CQRS + Sorgente di eventi - gruppi di eventi e propagazione di eventi

-1

Questa è la mia attuale configurazione del caso d'uso CQRS:

  1. Il DTO del comando viene ricevuto nel gestore del livello applicazione dove mappiamo il DTO del comando agli oggetti del dominio appropriati, se necessario, reidratiamo la radice aggregata dal repository e chiamiamo qualche metodo su AR

    class CommandHandler{
        handle(Command command){
            Dom1 dom1 = new Dom1(command.d1);
            Dom2 dom2 = new Dom2(command.d2);
            AggregateRoot ar = repo.rehydrate(command.arId);
            ar.doSmth(dom1, dom2, command.int1);
        }
    }
    
  2. All'interno del metodo doSmth in AR, non ho immediatamente un evento da applicare, ma devo passare alcuni argomenti all'entità che fa parte di questo AR. Quindi all'interno del metodo doSmth ho

    class AggregateRoot{
        void doSmth(Dom1 dom1, Dom2 dom2, Integer int1){
            subEnt = new OrherSubEntity();
            subEnt.doSmth(int1);
        }
    }
    
  3. Ora ho un'entità secondaria che viene attivata e genera un DomainEvent. Questo evento di dominio deve essere parte del flusso di eventi che viene salvato in EventStore e allo stesso tempo viene propagato a qualsiasi listener in questa radice aggregata.

    class SubEntity{
        void doSmth(Integer int1){
            //validate int1
            apply(new SubEntityEvent(int1));
        }
    
         void when(SubEntityEvent event){
            //just modify local fields
            //used also in event sourcing rehydration
         }
    }
    
  4. Poiché la seconda entità secondaria che sta ascoltando SubEntityEvent è ora attivata, esegue l'operazione intensa della CPU e genera SomeEnterprise DomainCent. Questo evento dovrebbe anche far parte del flusso di eventi che viene salvato in EventStore. Viene anche propagato ad altre sub-entità nella stessa AR ma poiché nessun altro lo ascolta, non abbiamo ulteriori elaborazioni.

    class OtherSubEntity{
        void listen(SubEntityEvent event){
            data = SomeCPUIntenseCalc();
            apply(SomeCPUIntenseCalcHappened(data));
        }
    
        void when(SomeCPUIntenseCalcHappened event){
            //just modify local fields
            //used also in event sourcing rehydration
        }
    }
    

Domande:

  1. Poiché AggreagateRoot è responsabile del salvataggio di tutti gli eventi nell'archivio eventi, come può sapere dell'esistenza di tutti questi eventi secondari verificatisi nella struttura ad albero.
  2. In questa transazione abbiamo generato 2 eventi. Dovremmo archiviarli all'interno dell'evento come una serie di eventi, o dovremmo memorizzarli semplicemente un evento dopo l'altro. Affinché AR sia in uno stato coerente, entrambi gli eventi devono essere applicati in un'unica transazione.
  3. In OtherSubEntity abbiamo il metodo di ascolto che ascolta SubEntityEvent. Dal momento che sulla reidratazione dall'archivio degli eventi chiameremo SubEntity.when (evento SubEntityEvent), come impedire che OtherSubEntity.listen venga chiamato contemporaneamente?
  4. Poiché la radice aggregata deve inviare 1 messaggio risultante lungo la pipeline (nel mio caso il messaggio AMQP), suppongo che questo messaggio sarà in realtà la proiezione di questi eventi a 2 domini - fondamentalmente un DTO del modello letto. Dove dovrei creare questo modello? Qualcosa di simile come modello letto solo con la propagazione?
posta bojanv55 01.02.2017 - 22:21
fonte

1 risposta

0

Since AggreagateRoot is responsible for saving all of the events to event store, how can it know for existence of all of this sub events that happened in it's tree.

Li chiede. Concettualmente, la radice aggregata è responsabile di tutte le modifiche di stato nell'aggregato, ma può delegare tale lavoro a entità subordinate, che renderanno disponibili gli eventi alla radice, che a loro volta potranno renderle disponibili per l'archiviazione.

In this transaction we generated 2 events. Should we store them inside event store as array of events, or we should store them simply one event after another. In order for AR to be in consistent state, both events must be applied in single transaction.

Logicamente, sono una storia lineare. In pratica, le tue soluzioni di persistenza che trattano l'elenco di "nuovi" eventi da salvare come singolo commit. Durante la riproduzione, gli eventi vengono in genere gestiti uno alla volta.

Dato che l'AR viene caricato direttamente da una cronologia registrata nel libro del record, ha tutti gli eventi (o più specificamente, qualsiasi salvataggio effettuato da uno stato che non include tutti gli eventi dovrebbe essere rifiutato), i fallimenti transitori che mantengono invariato non sono importanti, poiché alla fine del commit l'invariante è stato ristabilito.

Detto questo, la cronologia degli eventi è stata scritta da una vecchia versione dell'aggregato, ma il comando viene elaborato dall'implementazione corrente, che potrebbe non avere la stessa comprensione di "l'invariante".

A parte questo, in molti casi è possibile cambiare la rappresentazione dei tuoi eventi in modo tale che l'invariante venga stabilito dopo ogni evento.

In OtherSubEntity we have listen method that is listening to SubEntityEvent. Since on re-hydration from event store we will call SubEntity.when(SubEntityEvent event), how to prevent OtherSubEntity.listen to be called at the same time?

Ci sono un paio di possibilità - dal momento che ri-hyrdration è semplicemente un replay di stato, è possibile posticipare l'ascolto durante la riproduzione. Puoi anche lavorare per assicurarti che la risposta all'evento sia idempotente (l'elaborazione dell'evento due volte ha lo stesso effetto, quindi non importa che tu "senta" l'evento prima della fine della tua riproduzione.

Since aggregate root needs to send 1 resulting message down the pipeline

Questo vincolo richiede più contesto. A quale messaggio stai pensando? perché l'aggregato ha bisogno di inviarlo?

ATTENZIONE : avendo risposto, ti suggerisco di abbandonare questo approccio. CQRS + ES funziona molto meglio con un'architettura a cipolla, con il modello al centro. Unire l'aggregato nel tuo modello alla tua app e la tua persistenza rende l'intero esercizio molto più difficile di quanto non sia necessario.

class CommandHandler{
    // the model interface "injected" into your command handler
    Model.API theModel;

    void handle(Command command){
        History priorHistory = repo.getHistoryById(command.arId);
        History updatedHistory = theModel.simulate(priorHistory, command);
        repo.compareAndSwap(priorHistory, updateHistory);
    }
}
    
risposta data 02.02.2017 - 01:07
fonte

Leggi altre domande sui tag