Invia i record alla coda di messaggi utilizzando uno dei due criteri

0

Ho un mazzo di chiavi ( clientKey ) e valori ( processBytes ) che voglio inviare alla nostra coda di messaggistica comprimendoli in un array di byte. Creerò un array di byte di tutte le chiavi e i valori che dovrebbero sempre essere inferiori a 50 K e quindi inviarli alla nostra coda di messaggistica.

Per ogni partizione, ho un sacco di dataHolders quindi sto iterando quelli e poi inviandolo alla mia coda di messaggistica: -

private void validateAndSend(final DataPartition partition) {
  final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);

  // sending data via async policy but it can be send with other two sync queue policy as well.
  final Packet packet = new Packet(partition, new QPolicyAsync());

  DataHolder dataHolder;
  while ((dataHolder = dataHolders.poll()) != null) {
    packet.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8),
        dataHolder.getProcessBytes());
  }
  packet.close();
}

Packet class : questo class pack tutte le chiavi e i valori in un array di byte e richiama l'implementazione corrispondente passata nel costruttore per inviare i dati alla coda.

public final class Packet implements Closeable {
  private static final int MAX_SIZE = 50000;
  private static final int HEADER_SIZE = 36;

  private final byte dataCenter;
  private final byte recordVersion;
  private final long address;
  private final long addressFrom;
  private final long addressOrigin;
  private final byte partition;
  private final byte replicated;
  private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
  private final QueuePolicy policy;
  private int pendingItems = 0;

  public Packet(final DataPartition partition, final QueuePolicy policy) {
    this.partition = (byte) partition.getPartition();
    this.policy = policy;
    this.dataCenter = Utils.LOCATION.get().datacenter();
    this.recordVersion = 1;
    this.replicated = 0;
    final long packedAddress = new Data().packAddress();
    this.address = packedAddress;
    this.addressFrom = 0L;
    this.addressOrigin = packedAddress;
  }

  private void addHeader(final ByteBuffer buffer, final int items) {
    buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
        .putLong(address).putLong(addressFrom).putLong(addressOrigin).put(partition)
        .put(replicated);
  }

  // sending here by calling policy implementation
  private void sendData() {
    if (itemBuffer.position() == 0) {
      // no data to be sent
      return;
    }
    final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
    addHeader(buffer, pendingItems);
    buffer.put(itemBuffer);
    // sending data via particular policy
    policy.sendToQueue(address, buffer.array());
    itemBuffer.clear();
    pendingItems = 0;
  }

  public void addAndSendJunked(final byte[] key, final byte[] data) {
    if (key.length > 255) {
      return;
    }
    final byte keyLength = (byte) key.length;
    final byte dataLength = (byte) data.length;

    final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
    final int newSize = itemBuffer.position() + additionalSize;
    if (newSize >= (MAX_SIZE - HEADER_SIZE)) {
      sendData();
    }
    if (additionalSize > (MAX_SIZE - HEADER_SIZE)) {
      throw new AppConfigurationException("Size of single item exceeds maximum size");
    }

    final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
    final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
    // data layout
    itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
        .put(data);
    pendingItems++;
  }

  @Override
  public void close() {
    if (pendingItems > 0) {
      sendData();
    }
  }
}

Ora posso inviare i dati alla mia coda di messaggi in tre modi diversi, quindi ho creato un'interfaccia e poi ho implementato tre diverse implementazioni:

QueuePolicy interface :

public interface QueuePolicy {
    public boolean sendToQueue(final long address, final byte[] encodedRecords);
}

QPolicyAsync class :

public class QPolicyAsync implements QueuePolicy {

  @Override
  public boolean sendToQueue(long address, byte[] encodedRecords) {
    return SendRecord.getInstance().sendToQueueAsync(address, encodedRecords);
  }
}

QPolicySync class :

public class QPolicySync implements QueuePolicy {

  @Override
  public boolean sendToQueue(long address, byte[] encodedRecords) {
    return SendRecord.getInstance().sendToQueueSync(address, encodedRecords);
  }
}

QPolicySyncWithSocket class :

public class QPolicySyncWithSocket implements QueuePolicy {
  private final Socket socket;

  public QPolicySyncWithSocket(Socket socket) {
    this.socket = socket;
  }

  @Override
  public boolean sendToQueue(long address, byte[] encodedRecords) {
    return SendRecord.getInstance().sendToQueueSync(address, encodedRecords, Optional.of(socket));
  }
}

SendRecord class : questa è la classe effettiva che invia i dati alla mia coda di messaggi. Ha tre differenti implementazioni (numerate con 1, 2, 3 come commenti) e ognuna di queste implementazioni viene chiamata da sopra% implementazioni diQueuePolicy:

public class SendRecord {
  private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
  private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
      .concurrencyLevel(100).build();

  private static class Holder {
    private static final SendRecord INSTANCE = new SendRecord();
  }

  public static SendRecord getInstance() {
    return Holder.INSTANCE;
  }

  private SendRecord() {
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        handleRetry();
      }
    }, 0, 1, TimeUnit.SECONDS);
  }

  // this will retry to send data again if acknowledgment is not received
  // but only for async cases. For sync we don't retry at all
  private void handleRetry() {
    List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
    for (PendingMessage message : messages) {
      if (message.hasExpired()) {
        if (message.shouldRetry()) {
          message.markResent();
          doSendAsync(message, Optional.<Socket>absent());
        } else {
          cache.invalidate(message.getAddress());
        }
      }
    }
  }

  // #1 sends data asynchronously
  public boolean sendToQueueAsync(final long address, final byte[] encodedRecords) {
    PendingMessage m = new PendingMessage(address, encodedRecords, true);
    cache.put(address, m);
    return doSendAsync(m, Optional.<Socket>absent());
  }

  // place where we send data on a socket
  private boolean doSendAsync(final PendingMessage message, final Optional<Socket> socket) {
    Optional<Socket> actualSocket = socket;
    if (!actualSocket.isPresent()) {
      SocketState liveSocket = SocketPoolManager.getInstance().getSocket();
      actualSocket = Optional.of(liveSocket.getSocket());
    }

    ZMsg msg = new ZMsg();
    msg.add(message.getEncodedRecords());
    try {
      return msg.send(actualSocket.get());
    } finally {
      msg.destroy();
    }
  }

  // #2 sends data synchronously without taking socket as a parameter
  public boolean sendToQueueSync(final long address, final byte[] encodedRecords) {
    return sendToQueueSync(address, encodedRecords, Optional.<Socket>absent());
  }

  // #3 sends data synchronously but by taking socket as a parameter
  public boolean sendToQueueSync(final long address, final byte[] encodedRecords,
      final Optional<Socket> socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, false);
    cache.put(address, m);
    try {
      if (doSendAsync(m, socket)) {
        return m.waitForAck();
      }
    } finally {
      cache.invalidate(address);
    }
    return false;
  }

  // called by ResponsePoller thread to tell us that messaging queue
  // has received data
  public void handleAckReceived(final long address) {
    PendingMessage message = cache.getIfPresent(address);
    if (message != null) {
      message.ackReceived();
      cache.invalidate(address);
    }
  }
}

L'idea è molto semplice: sto inviando i dati alla mia coda di messaggi tramite una di queste tre implementazioni QueuePolicy . Dipende da come i clienti vogliono inviare i dati. A partire da ora sto passando l'implementazione di QueuePolicy nel costruttore Packet e poi mando i dati tramite quel criterio. Ogni implementazione QueuePolicy chiama il metodo corrispondente in SendRecord class.

  • La mia classe Packet deve sapere su come i dati vengono inviati? Penso che Packet class sia solo un contenitore, e questo è tutto. Penso che non dovremmo aspettarci che sappia come trasmettere se stesso. Un trasmettitore lo fa.
  • Inoltre, ogni implementazione QueuePolicy chiama il metodo corrispondente di SendRecord class. È davvero necessario o esiste un modo migliore? Non possiamo eliminare la classe SendRecord e averli implementati in ognuna di queste tre implementazioni QueuePolicy ?

Anche nel codice seguente, come posso sapere se i dati sono stati inviati con successo o no? La mia classe Packet richiama l'implementazione dei criteri di coda per inviare i dati e ciascuna implementazione restituisce un valore booleano ma il mio addAndSendJunked , sendData e close non restituisce nulla poiché sono nulli e per sync caso ho bisogno di sapere se i dati sono stati inviati o meno, quindi ho bisogno di restituire boolean pure da quei metodi in Packet class.

  // sending data via sync policy now
  final Packet packet = new Packet(partition, new QPolicySync());
  packet.addAndSendJunked("some_key".getBytes(StandardCharsets.UTF_8), StringUtils.EMPTY.getBytes(StandardCharsets.UTF_8));
  // in this case, it will send data from the close method call as threshold limit is not reached (since we only have one element to send)
  // so after calling close, it will call "sendData()" method immediately.
  packet.close();

Credo che con il mio design, potrei infrangere il principio di singola responsabilità o non seguire correttamente lo standard oops, quindi volevo vedere se possiamo progettare questo in modo efficiente e se c'è un modo migliore? Qual è il modo migliore con cui possiamo inviare key (s) e value (s) comprimendoli in un array di byte (seguendo il limite di 50K) tramite la modalità sync o async?

    
posta user1950349 06.03.2018 - 02:23
fonte

0 risposte