Ho un mazzo di chiavi ( clientKey
) e valori ( processBytes
) che voglio inviare alla nostra coda di messaggistica comprimendoli in un array di byte. Creerò un array di byte di tutte le chiavi e i valori che dovrebbero sempre essere inferiori a 50 K e quindi inviarli alla nostra coda di messaggistica.
Per ogni partizione, ho un sacco di dataHolders
quindi sto iterando quelli e poi inviandolo alla mia coda di messaggistica: -
private void validateAndSend(final DataPartition partition) {
final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
// sending data via async policy but it can be send with other two sync queue policy as well.
final Packet packet = new Packet(partition, new QPolicyAsync());
DataHolder dataHolder;
while ((dataHolder = dataHolders.poll()) != null) {
packet.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8),
dataHolder.getProcessBytes());
}
packet.close();
}
Packet
class : questo class pack tutte le chiavi e i valori in un array di byte e richiama l'implementazione corrispondente passata nel costruttore per inviare i dati alla coda.
public final class Packet implements Closeable {
private static final int MAX_SIZE = 50000;
private static final int HEADER_SIZE = 36;
private final byte dataCenter;
private final byte recordVersion;
private final long address;
private final long addressFrom;
private final long addressOrigin;
private final byte partition;
private final byte replicated;
private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
private final QueuePolicy policy;
private int pendingItems = 0;
public Packet(final DataPartition partition, final QueuePolicy policy) {
this.partition = (byte) partition.getPartition();
this.policy = policy;
this.dataCenter = Utils.LOCATION.get().datacenter();
this.recordVersion = 1;
this.replicated = 0;
final long packedAddress = new Data().packAddress();
this.address = packedAddress;
this.addressFrom = 0L;
this.addressOrigin = packedAddress;
}
private void addHeader(final ByteBuffer buffer, final int items) {
buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
.putLong(address).putLong(addressFrom).putLong(addressOrigin).put(partition)
.put(replicated);
}
// sending here by calling policy implementation
private void sendData() {
if (itemBuffer.position() == 0) {
// no data to be sent
return;
}
final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
addHeader(buffer, pendingItems);
buffer.put(itemBuffer);
// sending data via particular policy
policy.sendToQueue(address, buffer.array());
itemBuffer.clear();
pendingItems = 0;
}
public void addAndSendJunked(final byte[] key, final byte[] data) {
if (key.length > 255) {
return;
}
final byte keyLength = (byte) key.length;
final byte dataLength = (byte) data.length;
final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
final int newSize = itemBuffer.position() + additionalSize;
if (newSize >= (MAX_SIZE - HEADER_SIZE)) {
sendData();
}
if (additionalSize > (MAX_SIZE - HEADER_SIZE)) {
throw new AppConfigurationException("Size of single item exceeds maximum size");
}
final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
// data layout
itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
.put(data);
pendingItems++;
}
@Override
public void close() {
if (pendingItems > 0) {
sendData();
}
}
}
Ora posso inviare i dati alla mia coda di messaggi in tre modi diversi, quindi ho creato un'interfaccia e poi ho implementato tre diverse implementazioni:
QueuePolicy
interface :
public interface QueuePolicy {
public boolean sendToQueue(final long address, final byte[] encodedRecords);
}
QPolicyAsync
class :
public class QPolicyAsync implements QueuePolicy {
@Override
public boolean sendToQueue(long address, byte[] encodedRecords) {
return SendRecord.getInstance().sendToQueueAsync(address, encodedRecords);
}
}
QPolicySync
class :
public class QPolicySync implements QueuePolicy {
@Override
public boolean sendToQueue(long address, byte[] encodedRecords) {
return SendRecord.getInstance().sendToQueueSync(address, encodedRecords);
}
}
QPolicySyncWithSocket
class :
public class QPolicySyncWithSocket implements QueuePolicy {
private final Socket socket;
public QPolicySyncWithSocket(Socket socket) {
this.socket = socket;
}
@Override
public boolean sendToQueue(long address, byte[] encodedRecords) {
return SendRecord.getInstance().sendToQueueSync(address, encodedRecords, Optional.of(socket));
}
}
SendRecord
class : questa è la classe effettiva che invia i dati alla mia coda di messaggi. Ha tre differenti implementazioni (numerate con 1, 2, 3 come commenti) e ognuna di queste implementazioni viene chiamata da sopra% implementazioni diQueuePolicy
:
public class SendRecord {
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100).build();
private static class Holder {
private static final SendRecord INSTANCE = new SendRecord();
}
public static SendRecord getInstance() {
return Holder.INSTANCE;
}
private SendRecord() {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
handleRetry();
}
}, 0, 1, TimeUnit.SECONDS);
}
// this will retry to send data again if acknowledgment is not received
// but only for async cases. For sync we don't retry at all
private void handleRetry() {
List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
for (PendingMessage message : messages) {
if (message.hasExpired()) {
if (message.shouldRetry()) {
message.markResent();
doSendAsync(message, Optional.<Socket>absent());
} else {
cache.invalidate(message.getAddress());
}
}
}
}
// #1 sends data asynchronously
public boolean sendToQueueAsync(final long address, final byte[] encodedRecords) {
PendingMessage m = new PendingMessage(address, encodedRecords, true);
cache.put(address, m);
return doSendAsync(m, Optional.<Socket>absent());
}
// place where we send data on a socket
private boolean doSendAsync(final PendingMessage message, final Optional<Socket> socket) {
Optional<Socket> actualSocket = socket;
if (!actualSocket.isPresent()) {
SocketState liveSocket = SocketPoolManager.getInstance().getSocket();
actualSocket = Optional.of(liveSocket.getSocket());
}
ZMsg msg = new ZMsg();
msg.add(message.getEncodedRecords());
try {
return msg.send(actualSocket.get());
} finally {
msg.destroy();
}
}
// #2 sends data synchronously without taking socket as a parameter
public boolean sendToQueueSync(final long address, final byte[] encodedRecords) {
return sendToQueueSync(address, encodedRecords, Optional.<Socket>absent());
}
// #3 sends data synchronously but by taking socket as a parameter
public boolean sendToQueueSync(final long address, final byte[] encodedRecords,
final Optional<Socket> socket) {
PendingMessage m = new PendingMessage(address, encodedRecords, false);
cache.put(address, m);
try {
if (doSendAsync(m, socket)) {
return m.waitForAck();
}
} finally {
cache.invalidate(address);
}
return false;
}
// called by ResponsePoller thread to tell us that messaging queue
// has received data
public void handleAckReceived(final long address) {
PendingMessage message = cache.getIfPresent(address);
if (message != null) {
message.ackReceived();
cache.invalidate(address);
}
}
}
L'idea è molto semplice: sto inviando i dati alla mia coda di messaggi tramite una di queste tre implementazioni QueuePolicy
. Dipende da come i clienti vogliono inviare i dati. A partire da ora sto passando l'implementazione di QueuePolicy
nel costruttore Packet
e poi mando i dati tramite quel criterio. Ogni implementazione QueuePolicy
chiama il metodo corrispondente in SendRecord
class.
- La mia classe
Packet
deve sapere su come i dati vengono inviati? Penso chePacket
class sia solo un contenitore, e questo è tutto. Penso che non dovremmo aspettarci che sappia come trasmettere se stesso. Un trasmettitore lo fa. - Inoltre, ogni implementazione
QueuePolicy
chiama il metodo corrispondente diSendRecord
class. È davvero necessario o esiste un modo migliore? Non possiamo eliminare la classeSendRecord
e averli implementati in ognuna di queste tre implementazioniQueuePolicy
?
Anche nel codice seguente, come posso sapere se i dati sono stati inviati con successo o no? La mia classe Packet
richiama l'implementazione dei criteri di coda per inviare i dati e ciascuna implementazione restituisce un valore booleano ma il mio addAndSendJunked
, sendData
e close
non restituisce nulla poiché sono nulli e per sync
caso ho bisogno di sapere se i dati sono stati inviati o meno, quindi ho bisogno di restituire boolean pure da quei metodi in Packet
class.
// sending data via sync policy now
final Packet packet = new Packet(partition, new QPolicySync());
packet.addAndSendJunked("some_key".getBytes(StandardCharsets.UTF_8), StringUtils.EMPTY.getBytes(StandardCharsets.UTF_8));
// in this case, it will send data from the close method call as threshold limit is not reached (since we only have one element to send)
// so after calling close, it will call "sendData()" method immediately.
packet.close();
Credo che con il mio design, potrei infrangere il principio di singola responsabilità o non seguire correttamente lo standard oops, quindi volevo vedere se possiamo progettare questo in modo efficiente e se c'è un modo migliore? Qual è il modo migliore con cui possiamo inviare key (s) e value (s) comprimendoli in un array di byte (seguendo il limite di 50K) tramite la modalità sync o async?