Vorrei se potessi spiegarmi in modo semplice come funziona il pasticcio del disgregatore. Questo concetto è stato elusivo per me come lo so.
Forse con il tuo aiuto potrei comprenderlo.
Vorrei se potessi spiegarmi in modo semplice come funziona il pasticcio del disgregatore. Questo concetto è stato elusivo per me come lo so.
Forse con il tuo aiuto potrei comprenderlo.
L' articolo di Fowler fornisce un buon primer e questa spiegazione:
At a crude level you can think of a Disruptor as a multicast graph of queues where producers put objects on it that are sent to all the consumers for parallel consumption through separate downstream queues. When you look inside you see that this network of queues is really a single data structure - a ring buffer.
Each producer and consumer has a sequence counter to indicate which slot in the buffer it's currently working on. Each producer/consumer writes its own sequence counter but can read the others' sequence counters. This way the producer can read the consumers' counters to ensure the slot it wants to write in is available without any locks on the counters. Similarly a consumer can ensure it only processes messages once another consumer is done with it by watching the counters.
Un approccio più convenzionale potrebbe utilizzare una coda Producer e una coda Consumer, ognuna che utilizza i blocchi come meccanismi di concorrenza. In pratica, ciò che accade con le code di produzione e di consumo è che le code sono completamente vuote o completamente piene la maggior parte del tempo, il che causa conflitti di blocco e cicli di clock sprecati. Il disgregatore allevia questo, in parte, facendo in modo che tutti i produttori e i consumatori utilizzino lo stesso meccanismo di coda, coordinandosi tra loro osservando i contatori di sequenza anziché utilizzando meccanismi di blocco.
Da questo articolo su CoralQueue :
The disruptor pattern is a batching queue backed up by a circular array (i.e. the ring buffer) filled with pre-allocated transfer objects which uses memory-barriers to synchronize producers and consumers through sequences.
Quindi produttori e consumatori non si scontrano l'un l'altro all'interno dell'array circolare controllando le sequenze corrispondenti . E per comunicare le loro sequenze avanti e indietro tra loro usano barriere di memoria invece di serrature. Questo è il modo più veloce e privo di blocco che possono comunicare.
Fortunatamente non è necessario scendere ai dettagli interni del modello di disturbo per utilizzarlo. Oltre all'implementazione LMAX c'è CoralQueue sviluppato da Coral Blocks, con il quale sono affiliato. Alcune persone trovano più semplice comprendere un concetto leggendo il codice, quindi di seguito è riportato un semplice esempio di un singolo produttore che invia messaggi a un singolo consumatore. Puoi anche controllare questa domanda per un esempio di demultiplexer (un produttore per molti consumatori).
package com.coralblocks.coralqueue.sample.queue;
import com.coralblocks.coralqueue.AtomicQueue;
import com.coralblocks.coralqueue.Queue;
import com.coralblocks.coralqueue.util.Builder;
public class Basics {
public static void main(String[] args) {
final Queue<StringBuilder> queue = new AtomicQueue<StringBuilder>(1024, new Builder<StringBuilder>() {
@Override
public StringBuilder newInstance() {
return new StringBuilder(1024);
}
});
Thread producer = new Thread(new Runnable() {
private final StringBuilder getStringBuilder() {
StringBuilder sb;
while((sb = queue.nextToDispatch()) == null) {
// queue can be full if the size of the queue
// is small and/or the consumer is too slow
// busy spin (you can also use a wait strategy instead)
}
return sb;
}
@Override
public void run() {
StringBuilder sb;
while(true) { // the main loop of the thread
// (...) do whatever you have to do here...
// and whenever you want to send a message to
// the other thread you can just do:
sb = getStringBuilder();
sb.setLength(0);
sb.append("Hello!");
queue.flush();
// you can also send in batches to increase throughput:
sb = getStringBuilder();
sb.setLength(0);
sb.append("Hi!");
sb = getStringBuilder();
sb.setLength(0);
sb.append("Hi again!");
queue.flush(); // dispatch the two messages above...
}
}
}, "Producer");
Thread consumer = new Thread(new Runnable() {
@Override
public void run() {
while (true) { // the main loop of the thread
// (...) do whatever you have to do here...
// and whenever you want to check if the producer
// has sent a message you just do:
long avail;
while((avail = queue.availableToPoll()) == 0) {
// queue can be empty!
// busy spin (you can also use a wait strategy instead)
}
for(int i = 0; i < avail; i++) {
StringBuilder sb = queue.poll();
// (...) do whatever you want to do with the data
// just don't call toString() to create garbage...
// copy byte-by-byte instead...
}
queue.donePolling();
}
}
}, "Consumer");
consumer.start();
producer.start();
}
}
Disclaimer: sono uno degli sviluppatori di CoralQueue.
Leggi altre domande sui tag java design-patterns real-time multithreading