Invia record utilizzando la modalità asincrona o di sincronizzazione

0

Ho un sacco di chiavi e valori che voglio inviare alla nostra coda di messaggistica comprimendoli in un array di byte. Creerò un array di byte di tutte le chiavi e i valori che dovrebbero sempre essere inferiori a 50 K e quindi inviarli alla nostra coda di messaggistica.

Classe pacchetto :

public final class Packet implements Closeable {
  private static final int MAX_SIZE = 50000;
  private static final int HEADER_SIZE = 36;

  private final byte dataCenter;
  private final byte recordVersion;
  private final long address;
  private final long addressFrom;
  private final long addressOrigin;
  private final byte recordsPartition;
  private final byte replicated;
  private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
  private int pendingItems = 0;

  public Packet(final RecordPartition recordPartition) {
    this.recordsPartition = (byte) recordPartition.getPartition();
    this.dataCenter = Utils.LOCATION.get().datacenter();
    this.recordVersion = 1;
    this.replicated = 0;
    final long packedAddress = new Data().packAddress();
    this.address = packedAddress;
    this.addressFrom = 0L;
    this.addressOrigin = packedAddress;
  }

  private void addHeader(final ByteBuffer buffer, final int items) {
    buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
        .putLong(address).putLong(addressFrom).putLong(addressOrigin).put(recordsPartition)
        .put(replicated);
  }

  private void sendData() {
    if (itemBuffer.position() == 0) {
      // no data to be sent
      return;
    }
    final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
    addHeader(buffer, pendingItems);
    buffer.put(itemBuffer);
    SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
    // SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
    // SendRecord.getInstance().sendToQueueSync(address, buffer.array());
    // SendRecord.getInstance().sendToQueueSync(address, buffer.array(), socket);
    itemBuffer.clear();
    pendingItems = 0;
  }

  public void addAndSendJunked(final byte[] key, final byte[] data) {
    if (key.length > 255) {
      return;
    }
    final byte keyLength = (byte) key.length;
    final byte dataLength = (byte) data.length;

    final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
    final int newSize = itemBuffer.position() + additionalSize;
    if (newSize >= (MAX_SIZE - HEADER_SIZE)) {
      sendData();
    }
    if (additionalSize > (MAX_SIZE - HEADER_SIZE)) {
      throw new AppConfigurationException("Size of single item exceeds maximum size");
    }

    final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
    final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
    // data layout
    itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
        .put(data);
    pendingItems++;
  }

  @Override
  public void close() {
    if (pendingItems > 0) {
      sendData();
    }
  }
}

Di seguito è riportato il modo in cui sto inviando i dati. A partire da ora il mio design consente solo di inviare dati in modo asincrono chiamando il metodo sender.sendToQueueAsync nel metodo sendData() .

  private void validateAndSend(final RecordPartition partition) {
    final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);

    final Packet packet = new Packet(partition);

    DataHolder dataHolder;
    while ((dataHolder = dataHolders.poll()) != null) {
      packet.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8),
          dataHolder.getProcessBytes());
    }
    packet.close();
  }

Ora ho bisogno di estendere la mia progettazione in modo da poter inviare i dati in tre modi diversi. Spetta all'utente decidere in che modo inviare i dati a "sincronizzazione" o "asincrona".

  • Ho bisogno di inviare dati in modo asincrono chiamando il metodo sender.sendToQueueAsync .
  • oppure devo inviare i dati in modo sincrono chiamando il metodo sender.sendToQueueSync .
  • oppure devo inviare i dati in modo sincrono ma su un determinato socket chiamando il metodo sender.sendToQueueSync . In questo caso ho bisogno di passare la variabile socket in qualche modo in modo che sendData conosca questa variabile.

Classe SendRecord :

public class SendRecord {
  private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
  private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
      .concurrencyLevel(100).build();

  private static class Holder {
    private static final SendRecord INSTANCE = new SendRecord();
  }

  public static SendRecord getInstance() {
    return Holder.INSTANCE;
  }

  private SendRecord() {
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        handleRetry();
      }
    }, 0, 1, TimeUnit.SECONDS);
  }

  private void handleRetry() {
    List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
    for (PendingMessage message : messages) {
      if (message.hasExpired()) {
        if (message.shouldRetry()) {
          message.markResent();
          doSendAsync(message);
        } else {
          cache.invalidate(message.getAddress());
        }
      }
    }
  }

  // called by multiple threads concurrently
  public boolean sendToQueueAsync(final long address, final byte[] encodedRecords) {
    PendingMessage m = new PendingMessage(address, encodedRecords, true);
    cache.put(address, m);
    return doSendAsync(m);
  }

  // called by above method and also by handleRetry method
  private boolean doSendAsync(final PendingMessage pendingMessage) {
    Optional<SocketHolder> liveSocket = SocketManager.getInstance().getNextSocket();
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      // this returns instantly
      return msg.send(liveSocket.get().getSocket());
    } finally {
      msg.destroy();
    }
  }

  // called by send method below
  private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      // this returns instantly
      return msg.send(socket);
    } finally {
      msg.destroy();
    }
  }

  // called by multiple threads to send data synchronously without passing socket
  public boolean sendToQueueSync(final long address, final byte[] encodedRecords) {
    PendingMessage m = new PendingMessage(address, encodedRecords, false);
    cache.put(address, m);
    try {
      if (doSendAsync(m)) {
        return m.waitForAck();
      }
      return false;
    } finally {
      cache.invalidate(address);
    }
  }

  // called by a threads to send data synchronously but with socket as the parameter
  public boolean sendToQueueSync(final long address, final byte[] encodedRecords, final Socket socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, false);
    cache.put(address, m);
    try {
      if (doSendAsync(m, socket)) {
        return m.waitForAck();
      }
      return false;
    } finally {
      cache.invalidate(address);
    }
  }

  public void handleAckReceived(final long address) {
    PendingMessage record = cache.getIfPresent(address);
    if (record != null) {
      record.ackReceived();
      cache.invalidate(address);
    }
  }
}

I chiamanti chiameranno solo uno dei seguenti tre metodi:

  • sendToQueueAsync passando due parametri
  • sendToQueueSync passando due parametri
  • sendToQueueSync passando tre parametri

Come dovrei progettare la mia classe Packet e SendRecord in modo che io possa dire alla classe Packet che questi dati devono essere inviati in uno dei tre modi sopra elencati alla mia coda di messaggi. Spetta all'utente decidere in che modo desidera inviare i dati alla coda dei messaggi. A partire da ora il modo in cui è strutturata la mia classe Packet , può inviare dati solo in un modo.

    
posta user1950349 11.01.2018 - 23:35
fonte

3 risposte

1

Il tuo problema principale sono le chiamate SendRecord.getInstance()... in sendData() . La mia raccomandazione è quella di estrarlo come una politica. Segue uno schizzo di un approccio al cambiamento minimo.

Inizia creando un'interfaccia QueuePolicy

public interface QueuePolicy {
    public boolean sendToQueue(final long address, final byte[] encodedRecords);
}

Passa al Packet costruttore public Packet(final RecordPartition recordPartition, final QueuePolicy qPolicy) e archivia in una variabile di istanza, cioè this.qPolicy = qPolicy; .

Sostituisci la chiamata SendRecord.getInstance()... in sendData() con

qPolicy.sendToQueue(address, buffer.array());

Ora, in validateAndSend(...) , costruisci un oggetto che implementa l'interfaccia e passalo al costruttore Packet . Se il tuo cliente non ha accesso a validateAndSend(...) , dovrai aggiungere un parametro QueuePolicy a validateAndSend e chiedere al cliente di passarlo.

Un esempio di classe che implementa QueuePolicy è sotto. Le altre due varianti dovrebbero seguire lo stesso schema (ma senza l'istanza Socket );

public class QPolicyAsyncWithSocket implements QueuePolicy {
    private final Socket socket;

    public QPolicyAsyncWithSocket (Socket socket) {
        this.socket = socket;
    }

    public boolean sendToQueue(final long address, final byte[] encodedRecords) {
        return SendRecord.getInstance().sendToQueueSync(address, encodedRecords, socket);
    }
}
    
risposta data 16.01.2018 - 11:27
fonte
1

Considera l'utilizzo di FutureTask per semplificare l'interfaccia. Un'istanza FutureTask non è altro che un'attività che una volta eseguita restituirà un risultato. Anche le classi Executor funzionano bene con loro, perché sono solo un'estensione di una classe Runnable .

La tua classe SendRecord può quindi essere semplificata nel metodo sendToQueue accettando due parametri e restituendo un FutureTask . Devi quindi aggiungere solo Executor per gestire l'esecuzione di queste attività nel modo che preferisci. Puoi anche concatenare le attività future insieme se il risultato di uno dovesse essere richiesto da un'attività futura successiva, se lo desideri.

L'attività eseguita da FutureTask sarebbe quindi responsabile di:

  1. Esecuzione del trasferimento del pacchetto
  2. Attendi la risposta fino al timeout.
  3. Se la risposta non viene ricevuta o se la risposta è fallita:
    • Se il numero di tentativi è zero, imposta il risultato su errore.
    • Altrimenti, conta il numero di tentativi di decremento e torna al passaggio 1.
  4. Altrimenti risposta ricevuta e successo:
    • Imposta il risultato su successo.

Per eseguire semplicemente una chiamata sincrona, devi solo chiamare get() per forzare l'attività a risolversi in caso di successo o fallimento, sebbene idealmente dovresti evitare di forzare la risoluzione dell'attività il più a lungo possibile, poiché ti consente lavorare in parallelo il più a lungo possibile.

In questo modo, hai la massima flessibilità e nessuno dei grossi mix di implementazioni sincrone e asincrone, pur mantenendo il pieno controllo su quando queste attività si risolvono. Anche l'invio effettivo del pacchetto è interamente conservato nel suo mondo, rendendo le regolazioni molto più semplici. Sarebbe facile vedere come potresti adattare un'istanza FutureTask a concatenare anche con altre chiamate.

public class PacketChain extends FutureTask<PacketResponse> {
    private FutureTask<PacketResponse> task;
    private PacketChain next = null;

    public PacketChain(FutureTask<PacketResponse> task) {
        super(task, null);
        this.task = task;
    }

    public PacketChain add(FutureTask<PacketResponse> nextFutureTask) {
        next = new PacketChain(nextFutureTask);
        return next;
    }

    @Override
    public void run() {
        try {
            PacketResponse response = task.get();

            set(response);

            if(next != null && ResponseStatus.OK.equals(response.getStatus())) {
                // Launch successive packet in same thread (synchronously) 
                set(next.get());
            }
        } catch (InterruptedException | ExecutionException e) {
            // This will ensure interruption or other exceptions will bubble up
            setException(e);
        }
    }
}

Per usarlo, devi semplicemente fare quanto segue per aggiungere attività successive al primo pacchetto:

PacketChain task = new PacketChain(firstPacketTask);
task.add(secondPacketTask)
    .add(thirdPacketTask);

Forse la tua classe SendRecord potrebbe quindi concentrarsi sulla suddivisione delle informazioni in pezzi e restituire un singolo PacketChain che rappresenta il cumulo di tutte le chiamate necessarie per inviare le informazioni richieste.

    
risposta data 16.01.2018 - 12:28
fonte
-1

Da quando lo hai chiesto qui e non su stackexchange. Forse sei andato in un design sbagliato, se gli utenti dovessero decidere se il tuo programma invia sync o async, sei sicuro? Decisioni del genere non dovrebbero essere fatte dagli utenti, dovrebbero essere idioti dimostrati di funzionare in tutte le situazioni. il tuo programma dovrebbe scoprire se stesso cosa è meglio, riducendo il numero di opzioni dell'interfaccia utente.

    
risposta data 18.01.2018 - 08:28
fonte