Ho un sacco di chiavi e valori che voglio inviare alla nostra coda di messaggistica comprimendoli in un array di byte. Creerò un array di byte di tutte le chiavi e i valori che dovrebbero sempre essere inferiori a 50 K e quindi inviarli alla nostra coda di messaggistica.
Classe pacchetto :
public final class Packet implements Closeable {
private static final int MAX_SIZE = 50000;
private static final int HEADER_SIZE = 36;
private final byte dataCenter;
private final byte recordVersion;
private final long address;
private final long addressFrom;
private final long addressOrigin;
private final byte recordsPartition;
private final byte replicated;
private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
private int pendingItems = 0;
public Packet(final RecordPartition recordPartition) {
this.recordsPartition = (byte) recordPartition.getPartition();
this.dataCenter = Utils.LOCATION.get().datacenter();
this.recordVersion = 1;
this.replicated = 0;
final long packedAddress = new Data().packAddress();
this.address = packedAddress;
this.addressFrom = 0L;
this.addressOrigin = packedAddress;
}
private void addHeader(final ByteBuffer buffer, final int items) {
buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
.putLong(address).putLong(addressFrom).putLong(addressOrigin).put(recordsPartition)
.put(replicated);
}
private void sendData() {
if (itemBuffer.position() == 0) {
// no data to be sent
return;
}
final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
addHeader(buffer, pendingItems);
buffer.put(itemBuffer);
SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
// SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
// SendRecord.getInstance().sendToQueueSync(address, buffer.array());
// SendRecord.getInstance().sendToQueueSync(address, buffer.array(), socket);
itemBuffer.clear();
pendingItems = 0;
}
public void addAndSendJunked(final byte[] key, final byte[] data) {
if (key.length > 255) {
return;
}
final byte keyLength = (byte) key.length;
final byte dataLength = (byte) data.length;
final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
final int newSize = itemBuffer.position() + additionalSize;
if (newSize >= (MAX_SIZE - HEADER_SIZE)) {
sendData();
}
if (additionalSize > (MAX_SIZE - HEADER_SIZE)) {
throw new AppConfigurationException("Size of single item exceeds maximum size");
}
final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
// data layout
itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
.put(data);
pendingItems++;
}
@Override
public void close() {
if (pendingItems > 0) {
sendData();
}
}
}
Di seguito è riportato il modo in cui sto inviando i dati. A partire da ora il mio design consente solo di inviare dati in modo asincrono chiamando il metodo sender.sendToQueueAsync
nel metodo sendData()
.
private void validateAndSend(final RecordPartition partition) {
final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
final Packet packet = new Packet(partition);
DataHolder dataHolder;
while ((dataHolder = dataHolders.poll()) != null) {
packet.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8),
dataHolder.getProcessBytes());
}
packet.close();
}
Ora ho bisogno di estendere la mia progettazione in modo da poter inviare i dati in tre modi diversi. Spetta all'utente decidere in che modo inviare i dati a "sincronizzazione" o "asincrona".
- Ho bisogno di inviare dati in modo asincrono chiamando il metodo
sender.sendToQueueAsync
. - oppure devo inviare i dati in modo sincrono chiamando il metodo
sender.sendToQueueSync
. - oppure devo inviare i dati in modo sincrono ma su un determinato socket chiamando il metodo
sender.sendToQueueSync
. In questo caso ho bisogno di passare la variabilesocket
in qualche modo in modo chesendData
conosca questa variabile.
Classe SendRecord :
public class SendRecord {
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100).build();
private static class Holder {
private static final SendRecord INSTANCE = new SendRecord();
}
public static SendRecord getInstance() {
return Holder.INSTANCE;
}
private SendRecord() {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
handleRetry();
}
}, 0, 1, TimeUnit.SECONDS);
}
private void handleRetry() {
List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
for (PendingMessage message : messages) {
if (message.hasExpired()) {
if (message.shouldRetry()) {
message.markResent();
doSendAsync(message);
} else {
cache.invalidate(message.getAddress());
}
}
}
}
// called by multiple threads concurrently
public boolean sendToQueueAsync(final long address, final byte[] encodedRecords) {
PendingMessage m = new PendingMessage(address, encodedRecords, true);
cache.put(address, m);
return doSendAsync(m);
}
// called by above method and also by handleRetry method
private boolean doSendAsync(final PendingMessage pendingMessage) {
Optional<SocketHolder> liveSocket = SocketManager.getInstance().getNextSocket();
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
// this returns instantly
return msg.send(liveSocket.get().getSocket());
} finally {
msg.destroy();
}
}
// called by send method below
private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
// this returns instantly
return msg.send(socket);
} finally {
msg.destroy();
}
}
// called by multiple threads to send data synchronously without passing socket
public boolean sendToQueueSync(final long address, final byte[] encodedRecords) {
PendingMessage m = new PendingMessage(address, encodedRecords, false);
cache.put(address, m);
try {
if (doSendAsync(m)) {
return m.waitForAck();
}
return false;
} finally {
cache.invalidate(address);
}
}
// called by a threads to send data synchronously but with socket as the parameter
public boolean sendToQueueSync(final long address, final byte[] encodedRecords, final Socket socket) {
PendingMessage m = new PendingMessage(address, encodedRecords, false);
cache.put(address, m);
try {
if (doSendAsync(m, socket)) {
return m.waitForAck();
}
return false;
} finally {
cache.invalidate(address);
}
}
public void handleAckReceived(final long address) {
PendingMessage record = cache.getIfPresent(address);
if (record != null) {
record.ackReceived();
cache.invalidate(address);
}
}
}
I chiamanti chiameranno solo uno dei seguenti tre metodi:
- sendToQueueAsync passando due parametri
- sendToQueueSync passando due parametri
- sendToQueueSync passando tre parametri
Come dovrei progettare la mia classe Packet
e SendRecord
in modo che io possa dire alla classe Packet
che questi dati devono essere inviati in uno dei tre modi sopra elencati alla mia coda di messaggi. Spetta all'utente decidere in che modo desidera inviare i dati alla coda dei messaggi. A partire da ora il modo in cui è strutturata la mia classe Packet
, può inviare dati solo in un modo.