In difficoltà con l'implementazione della sincronizzazione o della concorrenza

0

Nella mia app due componenti A e B, hanno i loro dati e stato in base ai dati e allo stato di un terzo componente, componente C:

  • Sia A che B possono modificare i dati del componente C.
  • Sia A che B ascoltano il cambiamento dei dati del componente C e reagiscono a rielaborando i dati e lo stato interni.
  • A e B vengono eseguiti ciascuno sul proprio thread.

Ciò di cui sto cercando di implementare lo scenario in cui entrambi i componenti A e B cercano di aggiornare C simultaneamente e come reagire alle modifiche mentre l'operazione è in corso:

  1. A inizia ad aggiornare i dati su C
  2. B inizia ad aggiornare i dati su C. C ha un blocco interno che impedisce l'aggiornamento simultaneo dei dati, quindi l'operazione deve attendere che A venga completato per primo.
  3. A termina l'aggiornamento dei dati su C. C invia l'evento di modifica dei dati a entrambi A e B.
  4. B potrebbe iniziare ad aggiornare i dati su C, ma a questo punto i dati in C sono in uno stato diverso da come era quando B ha iniziato l'operazione. L'operazione potrebbe non avere più senso ora.

Nel passaggio n. 3, dove B ha ricevuto l'evento da C, penso che B dovrebbe rivalutare se stesso, ha bisogno di ricalcolare i suoi dati e lo stato. Ma dovrebbe anche cancellare la sua operazione di modifica dei dati avviata al punto # 2.

La stessa situazione può succedere il contrario, il componente B può iniziare prima l'operazione e il componente A è il secondo.

Questa è la parte con cui sto lottando, come implementare tutto questo. Prima di parlare di codice, darò anche un esempio più concreto:

L'interfaccia utente mostra un elemento su uno schermo che l'utente può aggiornare. C'è un componente in esecuzione su un thread in background, che può aggiornare lo stesso elemento a causa della modifica della posizione del dispositivo. Sia l'interfaccia utente che i componenti di localizzazione accedono al repository degli articoli, per aggiornare l'articolo.

Sto cercando alcuni suggerimenti su come implementarlo.

Uso .NET C #, quindi la mia soluzione utilizza classi specifiche per .NET. Tuttavia, non volevo rendere questa domanda specifica a .NET, perché la stessa domanda potrebbe applicarsi anche a Java e ad altre lingue \ tecnologie.

Un'idea che ho è che sia l'interfaccia utente sia il monitoraggio della posizione mantengano ciascuno un CancellationTokenSource che possono cancellare quando ricevono l'evento da C:

CancellationTokenSource _cts;

void UpdateItem(Item item) {
   _cts = new CancellationTokenSource();

   try
   {       
      _itemRepo.UpdateItem(item, …, _cts.Token);
   }
   catch(OperationCancelled)
   {
      // re-evaluate state here
   }
}

void ItemRepo_DataChanged(object sender, EventArgs e)
{
   if(_cts != null) {
      _cts.Cancel();
   }
}

Vedi altri modi per implementarlo?

In alternativa, ci sono esempi in framework o strumenti o anche progetti OSS che potrei dare un'occhiata?

    
posta Daniel 03.06.2018 - 13:47
fonte

2 risposte

2

C has an internal lock which prevents updating data concurrently, so the operation needs to wait for A to complete first.

Questa è la radice del problema: il blocco interno per gli aggiornamenti e le letture rende solo l'atto di leggere e scrivere la memoria in cui il valore di C risiede sicuro. Non fa nulla per situazioni come la tua in cui un thread non dovrebbe agire sui dati che un altro sta per renderizzare stantio.

Il caso semplice è dove il calcolo del nuovo valore di C è veloce e puoi bloccare C per tutta la sua durata.

a():
    ...
    c.lock()
    current_c := c.get_value()
    new_c := quickly_calculate_new_c(current_c)
    // set_new_value() is assumed not to send notifications
    // if the new value isn't different than the old one.
    c.set_value(new_c)
    c.unlock()
    ...

B avrebbe lo stesso codice con il proprio calcolo per le nuove C. Le decisioni di A basate sul valore corrente saranno valide per tutto il tempo poiché la sua tenuta sul blocco impedisce ad altri thread di agire sui dati che stanno per essere modificati. Le decisioni di B ottengono la stessa protezione.

Una cosa importante da notare è che se non c'è nulla di sincrono sul modo in cui funzionano A e B, A non può prendere decisioni basate su ciò che B potrebbe fare in futuro. Se A richiede il blocco di C e lo ottiene, significa che lo stato di C è l'informazione più aggiornata disponibile e dovrebbe essere utilizzata per apportare modifiche. Tutto ciò che deve attendere il blocco nel frattempo è un'azione futura.

La tua domanda implica che il calcolo è un processo di lunga durata e dovrebbe essere interrotto se una modifica a C lo renderebbe inutile. Stando così le cose, il calcolo deve essere fatto in modo cooperativo, abortire quando farlo non ha più senso:

a_update_c():

    // Hold the initial value for later.
    c.lock()
    old_c := c.get_value()
    c.unlock()

    if no_reason_to_change_c(old_c)
        return(OK)

    // Do the calculation of C's new value a bit at a time
    while do_part_of_new_c_calculation(old_c)
        if notified_by(c)
            c.lock()
            changed_c := c.get_value()
            c.unlock()
            if calcluation_is_pointless(old_c, changed_c)
                abort_new_c_calculation()
                return(TRY_AGAIN)

    new_c := result_of_new_c_calculation()

    c.lock()

    // If our assumptions are still valid, update C.  Since
    // C is locked, nothing else can change the value or
    // make decisions based on it while our own decision
    // and update are underway.
    if c.get_value() != old_c_value
        c.unlock()
        return(TRY_AGAIN)
    c.set_value(new_c_value)

    c.unlock()
    return(OK)

a():
    ...
    while a_update_c() != TRY_AGAIN
        null  // Do nothing; just loop.
    ...

Esistono modi più eleganti (e più sicuri) per gestire il blocco e lo sblocco, ma indipendentemente dal fatto che siano disponibili o meno dipende dalla lingua.

Se il blocco che stai utilizzando è reentrant , puoi utilizzare lo stesso blocco e avere get_value () e set_value () acquisirlo in modo che transazioni semplici siano garantite senza richiedere al chiamante di circondarlo con un lock () / unlock ().

Il problema dell'interfaccia utente è un problema di esperienza utente, ovvero il modo in cui gestire i dati che l'utente sta interagendo con i cambiamenti durante l'interazione. Probabilmente è stato trattato su UX.SE .

    
risposta data 03.06.2018 - 16:46
fonte
1

Monitor

Dall'astratto, il blocco interno in C funzionerebbe. Di solito è il punto di partenza.

Ora, il problema è che entrambi i thread A e B arrivano al blocco per scrivere in C. A ottiene il blocco. B aspetta. A aggiorna lo stato di C. Ora B si sveglia, ma lo stato è cambiato, quello che stava per fare non è più valido.

B dovrebbe leggere lo stato all'interno del blocco . Perché? Perché se B legge lo stato al di fuori del blocco, potrebbe essere cambiato per quando arriva al lucchetto. Inoltre, se non si utilizza un blocco per leggere lo stato - e in base alla struttura di C - A o B potrebbe vedere uno stato parzialmente aggiornato. Non va mai bene.

Tuttavia, questo blocco esiste all'interno di C. E se devi leggere all'interno del blocco , devi passare una richiamata per calcolare lo stato aggiornato. E implementeresti anche la tua logica di riprovare.

Nota: in .NET, puoi fare lo stesso comportamento di cui sopra usando un set di SemaphoreSlim per far entrare solo un thread. Il vantaggio è che ascolterà CancelToken. Se preferisci che riprovi la logica.

Blocco cooperativo

Quindi, hai un blocco, due thread vogliono entrare, ma solo uno lo ottiene. L'altro sta aspettando, perdendo tempo.

E stanno entrambi richiamando i callback per calcolare l'aggiornamento che vogliono fare.

Ok, fantastico. Memorizza questi callback in una raccolta thread-safe. Ora, il thread che ottiene il blocco può eseguirli tutti, in ordine. E il thread che non ottiene il blocco, può andare via sapendo che il thread che ha ottenuto il blocco farà il lavoro.

Come puoi vedere, con questo metodo, l'aggiornamento verrà sempre eseguito con la versione aggiornata dello stato. Quindi, non c'è bisogno di cancellazioni. Se sei abile con la raccolta thread-safe, il thread può anche controllare se ha già effettuato una richiamata su di esso (e non fare nulla o sostituito, a seconda di cosa ti serve).

Nota: per quella raccolta thread-safe, puoi semplicemente usare un array. Dare ad ogni abbonato un indice sulla matrice.

Utilizzo di un blocco di lettura-scrittura

Diciamo che non ti piace l'idea di blocco cooperativo. E tu vuoi restare con la logica dei tentativi.

Ok, vediamo come un lock reader reader può aiutarti ... Con esso, puoi avere più thread che leggono lo stato simultaneamente, e poi aggiornano a un lock writer quando devono aggiornarsi.

E per la logica dei tentativi ... il thread A e il thread B stanno leggendo, sia il thread A che il thread B vogliono eseguire l'aggiornamento a una scrittura. Uno di loro avrà successo e l'altro fallirà. Diciamo che A ha avuto successo. B non è bloccato in attesa, invece ha ottenuto un'eccezione. B deve richiedere nuovamente un blocco di lettura, che farà in modo che B attenda che A completi la scrittura. Quando B ottiene il blocco del lettore, lo stato è cambiato, quindi deve nuovamente leggere.

.NET ReaderWriterLock ha un timeout per l'aggiornamento. Puoi impostare il timeout per l'aggiornamento a 0 senza alcuna perdita di funzionalità.

Aggiornamento atomico

Quando C cambia, notifica A e B. Nella notifica, può inviare un oggetto stato di valore che ha lo stato di copia C insieme a un oggetto versione. Quindi A e B possono lavorare su questo oggetto stato valore e restituirlo a C per richiedere un aggiornamento.

Quindi C può seguire un modello a doppio controllo sulla versione: se l'oggetto versione è lo stesso, prova a ottenere il blocco. All'interno del blocco, ricontrollare se l'oggetto versione è lo stesso. Se lo è, aggiorna, se non lo è, vai.

Inoltre, C può copiare lo stato ricevuto su una variabile locale e utilizzare operazioni interbloccate per aggiornare lo stato interno da quella copia in un'operazione atomica. In questo modo si evita l'uso di Monitor. Pertanto, i thread non devono aspettare.

Nota: Poiché vogliamo utilizzare le operazioni interbloccate sullo stato, non dovrebbe essere un tipo di valore. Come ottimizzazione, è possibile implementare un pool di oggetti per gli oggetti dello stato del valore.

Note:

  • Mantieni lo stato più recente conosciuto. Compreso lo stato che si invia se l'aggiornamento è riuscito. Il codice che calcola lo stato aggiornato deve leggere lo stato più recente conosciuto. Ciò consente il verificarsi di più aggiornamenti tra le esecuzioni.
  • È costoso calcolare lo stato aggiornato? Se lo è, nel processo di aggiornamento è necessario inserire i controlli per vedere se si è ottenuto uno stato più recente. No, CancellationToken non ti compra nulla in questo caso.
risposta data 03.06.2018 - 17:28
fonte

Leggi altre domande sui tag