Algoritmo di batching filettato

4

Ok, sto lavorando alla memorizzazione nella cache basata su DynamoDb. Abbiamo un'API in cui ogni chiamata ci costa denaro reale e le informazioni recuperate sono "fresche" per 2-3 settimane, quindi ha senso memorizzarlo nella cache.    Non c'è alcun problema con la chiamata di dynamoDbClient.get(apiUrl) e il recupero della risposta JSON memorizzata. Hanno lotti: 25 articoli in lotti per PUT e 100 articoli in lotti per GET. Quindi significa che se abbiamo bisogno di ottenere una risposta JSON per 100 voci, facciamo una chiamata invece di 100.

La domanda che ho è come organizzarlo nel modo migliore per il threading.

Ecco un'idea generale e sono aperto ai suggerimenti.

Supponendo di avere una chiamata batch con 100 voci, possiamo usare BlockingQueue<String> per memorizzare le chiavi che possiamo usare in seguito. Useremmo il metodo blockingQueue.put() , per fare in modo che altri thread aspettino se 150 thread sono arrivati in un metodo con un lotto di soli 100 disponibili.

Quindi il thread che entra nel metodo, ottiene il suo posto nella coda di batch, ha bisogno di un meccanismo di "blocco" per aspettare che arrivi la risposta e "svegliarlo" affinché la sua risposta sia disponibile.

pseudoCodeMethod(String resourceUrl) {
  blockingQueue.put(resourceUrl); // sleep if no place
  LockableResponse lockableResponse = getLockableResponse();
  lockableResponse.setKey(resourceUrl);
  lockableResponse.getSemaphore().acquire();

  String jsonApiResponse = lockableResponse.getJsonApiResponse();
  lockableResponse.clean();

  return jsonApiResponse;
}

LockableResponse{
  String resourceUrl;
  String jsonApiResponse;
  Semaphore semaphore = new Semaphore(0);

  public void clean(){
     resourceUrl = null;
     jsonApiResponse = null;         
  } 
}

Il mio pensiero è che dovremmo associare un resourceUrl per ogni risposta dinamodb con una struttura dati che ha semaforo da attendere per la risposta API che arriva per ~ 100 elementi in una chiamata batch.

Un thread separato esegue la chiamata e quindi itera attraverso la risposta, assegna jsonApiResponse a% correttoresourceUrl [ concurrentMap per quello ?? ] e quindi chiama lockableResponse.release() per riattivare il thread, quindi il thread richiede resourceUrl ed esiste il metodo.

Il metodo clean() potrebbe essere aggiunto per consentire il riutilizzo della stessa struttura. Sto pensando a una serie di risorse bloccabili in base alla capacità del lotto. Quindi potrebbero essere cancellati e riutilizzati per altre chiamate GET batch.

Pensieri? Suggerimenti?

    
posta Flamaker2018 05.02.2018 - 15:49
fonte

2 risposte

2

Ecco un approccio reattivo al tuo problema. RxJava ti consente di gestire le risorse, in particolare i thread, in uno stile simile a quello utilizzato nei flussi Java.

La coda di richieste per GET è un SerializedSubject in cui vengono inviate le richieste. SerializedSubject è thread-safe e consente di effettuare richieste da qualsiasi thread.

SerializedSubject<Pair<Observable<JsonResponse>, String> requestGetQueue =
  PublishSubject.<Pair<Observable<JsonResponse>, String>create().toSerial();

Una volta dichiarato l'oggetto, imposta l'elaborazione delle richieste in coda. L'operatore observeOn() indica alla catena di osservatori di eseguire l'elaborazione su un particolare programma di pianificazione, che quindi seleziona un thread dal suo pool per eseguire tutte le operazioni. RxJava prende il thread-hopping per passare dal thread chiamante al thread che gestisce le richieste.

requestGetQueue
  .observeOn( Schedulers.io() )
  .buffer( 100 )  // batch size
  .subscribe( requestList -> processRequestList( requestList ),
    error -> log.error( error ) );

L'operatore buffer() raggruppa un insieme di richieste in un elenco. La dimensione del batch è 100, per la pubblicazione originale. L'operatore buffer() può prendere un parametro aggiuntivo per impostare un timeout, in modo che alla fine venga gestito un gruppo solo di richieste e nulla resti bloccato. È una decisione aziendale se desideri gestire meno di 100 richieste in qualsiasi momento.

Observable<JsonResponse> getApiValue( String url ) {
  BehaviorSubject<JsonResponse> responseFromApi = BehaviorSubject.create();
  requestQueue.onNext( new Pair<>(responseFromApi, url) );
  return responseFromApi;
}

La richiesta API per i client è molto semplice. Ciò che il chiamante torna è l'equivalente di un futuro. Il client effettua la richiesta e stabilisce una richiamata utilizzando il passo subscribe() per gestire il JsonResponse che alla fine verrà eseguito. È richiesto anche un gestore degli errori, poiché ... beh, gli errori si verificano. L'operatore observeOn() viene utilizzato per spostare la gestione della risposta su un altro thread. Di più su questo in un po '.

getApiValue( apiUrlString )
  .observeOn( clientScheduler )
  .subscribe( jsonResponse -> { ... },
    error -> { ... } );

Gestire la richiesta è semplicemente il raggruppamento dell'elenco di URL, in attesa che l'elenco delle risposte ritorni e accoppi le risposte alle richieste. Il codice sottostante presuppone che tutte le risposte tornino in ordine. La risposta viene emessa al client utilizzando onNext() seguito da onCompleted() .

void processRequestList( List<Pair<Observable<JsonResponse>,String>> requestList ) {
  List<String> uriList = new ArrayList<>();
  for ( int s = 0; s < requestList.size(); s++ ) {
    uriList.add( requestList.getSecond() );
  }
  List<JsonResponse> results = sendBatchRequest( uriList );
  for ( int i = 0; i < requestList.size(); i++ ) {
    requestList.get(i).getFirst().onNext( results.get(i) );
    requestList.get(i).getFirst().onCompleted();
  }
}

Ho menzionato in precedenza che il client deve utilizzare uno scheduler per spostare l'elaborazione sul proprio thread. Il modo in cui lo fai dipende da come sono impostati i thread client. È possibile creare lo scheduler del client da un servizio executor:

clientScheduler = Schedulers.from( executorService );

e l'utilizzo di observeOn( clientScheduler ) farà spostare le operazioni successive sul thread del client.

Riepilogo

RxJava e piattaforme reattive simili gestiscono quasi tutti i dettagli di thread, lock, semafori, mutex, code di blocco, timer, risposte, ecc. Devi ancora capire quali sono le cose necessarie per la sicurezza dei thread, e quando l'elaborazione viene eseguita su thread particolari, ma quasi tutti gli elementi difficili e difficili vengono tenuti dietro le quinte.

    
risposta data 08.02.2018 - 15:50
fonte
2

Per ricapitolare: il problema che vuoi risolvere è simile a una situazione di "navetta". Hai richieste in arrivo e desideri raggrupparle in gruppi di una determinata dimensione e inviarle insieme.

Ci ho pensato e ho creato un piccolo prototipo e puoi ottenere questo comportamento con un paio di semplici costrutti di concorrenza in Java: a CountDownLatch e a ConcurrentQueue . L'idea di base è di prendere tutte le richieste in coda e creare un oggetto per contenere la richiesta ed è la risposta corrispondente con un CountDownLatch che bloccherà il recupero del risultato fino a quando non viene recuperato. Quando la profondità della coda raggiunge la dimensione del batch, si estraggono molte richieste e si invia il batch. Quindi imposta i risultati e libera il latch consentendo al thread chiamante di recuperare il risultato. Segue un esempio di codice. Ho effettuato solo test minimi e non è necessariamente la soluzione migliore, ma dovrebbe darti una panoramica dell'approccio.

Prima alcuni 'primitivi':

Richiesta

import java.util.concurrent.CountDownLatch;

class Request<Q,R>
{
  private final Q value;
  private final CountDownLatch latch = new CountDownLatch(1);
  private volatile R result;

  Request(Q value)
  {
    this.value = value;
  }

  public Q value()
  {
    return value;
  }

  void setResult(R result)
  {
    this.result = result;
    latch.countDown();
  }

  public R getResult() throws InterruptedException
  {
    latch.await();
    return result;
  }
}

Processore

public interface Processor<Q, R>
{
  void execute(Collection<Request<Q,R>> requests);
}

Quanto sopra è responsabile dell'emissione della richiesta batch e quindi dell'impostazione delle risposte per ogni richiesta corrispondente. Puoi costruire cose intorno a questo per soddisfare le tue esigenze.

E poi il sollevamento pesante:

RequestBatcher

import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class RequestBatcher<Q, R>
{
  private final int limit;
  private final BlockingQueue<Request<Q,R>> queue;
  private final Processor<Q,R> processor;

  RequestBatcher(Processor<Q,R> processor, int limit)
  {
    this.limit = limit;
    this.processor = processor;
    queue = new ArrayBlockingQueue<>(limit * 2);
  }

  public Future<R> process(Q request) throws InterruptedException
  {
    Request<Q,R> holder = new Request<>(request); 
    BatchedCallable callable = new BatchedCallable(holder);

    queue.put(holder);

    if (queue.size() >= limit) batchRequests();

    return callable.future();
  }

  public R await(Q request) throws InterruptedException, ExecutionException
  {
    Future<R> response = process(request);

    return response.get();
  }

  private class BatchedCallable implements Callable<R>
  {
    final Request<Q,R> request;

    BatchedCallable(Request<Q,R> request)
    {
      this.request = request;
    }

    @Override
    public R call() throws Exception
    {
      return request.getResult();
    }

    public Future<R> future()
    {
      FutureTask<R> future = new FutureTask<R>(this);
      future.run();

      return future;
    }
  }

  private void batchRequests() throws InterruptedException
  {
    while(queue.size() >= limit) {
      ArrayList<Request<Q,R>> requests = new ArrayList<>(limit);

      for (int i =0; i < limit; i++) {
        requests.add(queue.take());
      }

      processor.execute(requests);
    }
  }
}

Questo implementa ciò che penso tu abbia detto che vuoi fare nella tua domanda. Ho aggiunto un blocco await() e il metodo process() non bloccante. Il primo è buono se vuoi che funzioni come una chiamata sincrona per richiesta. Quest'ultimo ti consentirà di utilizzare executors o simili per rendere le cose più parallele.

Ci sono un paio di problemi con quanto sopra che penso valga la pena di menzionare. Un problema è che nulla verrà eseguito fino a raggiungere il limite di batch. Se non si dispone di un flusso costante di richieste in arrivo, si avranno fondamentalmente un mucchio di thread appesi. Se si utilizza la versione di blocco, ciò significa che sono necessari i thread medi per fare una singola richiesta. Immagino che tu non abbia 100 thread che fanno continuamente richieste allo stesso tempo, quindi questo potrebbe essere problematico. Una risposta sarebbe avere un singolo thread per eseguire il batching e utilizzare un altro CountDownLatch che attende. Quando si preme il numero di lotto, si conta alla rovescia e va. Ciò ti consente di impostare un timeout in await() che puoi utilizzare per far funzionare gruppi non completi dopo aver atteso oltre un limite di tempo.

Da un lato, eviterei anche i tentativi di riutilizzare le strutture dati. Questo potrebbe sembrare che migliorerà le prestazioni, ma quello che farà spesso è far sì che gli oggetti vivano abbastanza a lungo da sopravvivere dagli spazi eden o giovani e nei luoghi in cui risiedono gli oggetti vecchi. Il costo della raccolta di questi è molto più alto rispetto agli oggetti di breve durata. Probabilmente stai meglio semplicemente creando nuovi oggetti secondo necessità e lasciandoli raccoglierli.

La prima cosa che vorrei dire è che dovresti familiarizzare con Esecutore quadro. Penso che lo schema di base di ciò che stai proponendo sia facilmente implementato usando un executor pool di thread fissi o una delle altre implementazioni ExecutorService incorporate.

    
risposta data 05.02.2018 - 22:32
fonte

Leggi altre domande sui tag