Salvataggio di eventi ad alta frequenza in un database con limiti di connessione limitato

13

Abbiamo una situazione in cui devo affrontare un massiccio afflusso di eventi in arrivo sul nostro server, a circa 1000 eventi al secondo, in media (il picco potrebbe essere ~ 2000).

Il problema

Il nostro sistema è ospitato su Heroku e utilizza un Heroku Postgres DB , che consente un massimo di 500 connessioni DB. Utilizziamo il pool di connessioni per connettersi dal server al DB.

Gli eventi arrivano più velocemente di quanto il pool di connessioni DB possa gestire

Il problema che abbiamo è che gli eventi arrivano più velocemente di quanto il pool di connessioni possa gestire. Nel momento in cui una connessione ha completato il roundtrip della rete dal server al DB, in modo che possa essere rilasciato nuovamente al pool, più di n di eventi aggiuntivi entrano.

Alla fine gli eventi si accumulano, in attesa di essere salvati e poiché non ci sono connessioni disponibili nel pool, scadono e l'intero sistema non viene reso operativo.

Abbiamo risolto l'emergenza emettendo gli eventi ad alta frequenza offensivi a un ritmo più lento da parte dei clienti, ma vogliamo ancora sapere come gestire questi scenari nell'evento in cui dobbiamo gestire gli eventi ad alta frequenza.

Vincoli

Altri client potrebbero voler leggere eventi contemporaneamente

Altri client richiedono continuamente di leggere tutti gli eventi con una chiave particolare, anche se non sono ancora stati salvati nel DB.

Un cliente può interrogare GET api/v1/events?clientId=1 e ottenere tutti gli eventi inviati dal client 1, anche se quegli eventi non sono ancora stati salvati nel DB.

Ci sono esempi di "aula" su come affrontarlo?

Possibili soluzioni

Accoda gli eventi sul nostro server

Potremmo accodare gli eventi sul server (con la coda con una concorrenza massima di 400, quindi il pool di connessioni non si esaurisce).

Questa è una pessima idea perché:

  • Mangerà la memoria del server disponibile. Gli eventi accodati accatastati consumeranno enormi quantità di RAM.
  • I nostri server riavviano una volta ogni 24 ore . Questo è un limite rigido imposto da Heroku. Il server può riavviarsi mentre gli eventi vengono messi in coda causandoci la perdita degli eventi accodati.
  • Introduce lo stato sul server, compromettendo così la scalabilità. Se abbiamo una configurazione multi-server e un client vuole leggere tutti gli eventi accodati + salvati, non sapremo su quale server vivono gli eventi collegati.

Utilizza una coda messaggi separata

Suppongo che potremmo usare una coda di messaggi, (come RabbitMQ ?), dove pompiamo i messaggi in essa e dall'altra fine c'è un altro server che si occupa solo di salvare gli eventi sul DB.

Non sono sicuro che le code di messaggi consentano di eseguire query sugli eventi in coda (che non erano ancora stati salvati), quindi se un altro cliente desidera leggere i messaggi di un altro client, posso semplicemente ottenere i messaggi salvati dal DB e dai messaggi in sospeso dalla coda e concatenarli insieme in modo da poterli inviare nuovamente al client di richiesta di lettura.

Utilizza più database, ciascuno dei quali salva una parte dei messaggi con un server DB-coordinator centrale per gestirli

Un'altra soluzione che abbiamo è quella di utilizzare più database, con un "coordinatore di DB / bilanciamento del carico" centrale. Dopo aver ricevuto un evento questo coordinatore sceglierebbe uno dei database in cui scrivere il messaggio. Ciò dovrebbe consentirci di utilizzare più database Heroku, aumentando così il limite di connessione a 500 x numero di database.

Su una query di lettura, questo coordinatore potrebbe rilasciare% query diSELECT a ciascun database, unire tutti i risultati e inviarli al client che ha richiesto la lettura.

Questa è una pessima idea perché:

  • Questa idea suona come ... ahem .. over-engineering? Sarebbe anche un incubo da gestire (backup ecc.). È complicato da costruire e mantenere e, a meno che non sia assolutamente necessario, sembra una KISS violazione.
  • Sacrifica Consistenza . Fare transazioni su più DB è un no-go se andiamo con questa idea.
posta Nik Kyriakides 22.09.2018 - 07:34
fonte

7 risposte

9

Flusso di input

Non è chiaro se i 1000 eventi / secondo rappresentano picchi o se si tratta di un carico continuo:

  • se è un picco, è possibile utilizzare una coda di messaggi come buffer per distribuire il carico sul server DB più a lungo;
  • se è carico costante, la coda dei messaggi da sola non è sufficiente, perché il server DB non sarà mai in grado di recuperare. Quindi dovresti pensare a un database distribuito.

Soluzione proposta

Intuitivamente, in entrambi i casi, sceglierei un flusso di eventi basato su Kafka :

  • Tutti gli eventi vengono sistematicamente pubblicati su un topic di kafka
  • Un consumatore si iscrivere agli eventi e li memorizza nel database.
  • Un processore di query gestirà le richieste dei client e interroga il DB.

Questo è altamente scalabile a tutti i livelli:

  • Se il server DB è il collo di bottiglia, basta aggiungere più utenti. Ognuno potrebbe sottoscrivere l'argomento e scrivere su un altro server DB. Tuttavia, se la distribuzione avviene in modo casuale tra i server DB, il processore di query non sarà in grado di prevedere il server DB da prendere e di dover interrogare diversi server DB. Ciò potrebbe portare a un nuovo collo di bottiglia sul lato della query.
  • Lo schema di distribuzione del DB potrebbe quindi essere anticipato organizzando il flusso di eventi in diversi argomenti (ad esempio, utilizzando gruppi di chiavi o proprietà, per partizionare il DB secondo una logica prevedibile).
  • Se un server di messaggi non è sufficiente per gestire una marea crescente di eventi di input, puoi aggiungere kafka partitions per distribuire gli argomenti di kafka su diversi server fisici.

Offrire eventi non ancora scritti nel DB ai clienti

Vuoi che i tuoi clienti siano in grado di accedere anche alle informazioni ancora nella pipe e non ancora scritte nel DB. Questo è un po 'più delicato.

Opzione 1: utilizzo di una cache per integrare le query db

Non l'ho analizzato in profondità, ma la prima idea che mi viene in mente sarebbe quella di rendere il / i processore / i di query un / i consumatore / i degli argomenti di kafka, ma in un diverso kafka consumer group . Il processore della richiesta riceverà quindi tutti i messaggi che lo scrittore DB riceverà, ma indipendentemente. Potrebbe quindi tenerli in una cache locale. Le query sarebbero poi eseguite su DB + cache (+ eliminazione dei duplicati).

Il design sarebbe quindi:

Lascalabilitàdiquestolivellodiquerypotrebbeessereottenutaaggiungendopiùprocessoridiquery(ciascunonelpropriogruppodiconsumatori).

Opzione2:progettaunadoppiaAPI

UnapprocciomiglioreIMHOsarebbequellodioffrireunadoppiaAPI(utilizzareilmeccanismodelgrupposeparatodiconsumatori):

  • unaAPIdiqueryperaccedereaglieventinelDBe/ofareanalisi
  • un'APIdistreamingcheinoltraimessaggidirettamentedall'argomento

Ilvantaggioèchelascicheilclientedecidacosaèinteressante.CiòpotrebbeevitarediuniresistematicamenteidatiDBcondatiappenaincassati,quandoilclienteèinteressatosoloainuovieventiinarrivo.Seladelicataunionetraeventifreschiearchiviatièdavveronecessaria,allorailclientedovrebbeorganizzarla.

Varianti

Hopropostokafkaperchéèprogettatoper volumi molto alti con messaggi persistenti per poter riavviare i server se necessario.

Potresti costruire un'architettura simile con RabbitMQ. Tuttavia, se hai bisogno di code persistenti, potrebbe ridurre le prestazioni . Inoltre, per quanto ne so, l'unico modo per ottenere il consumo parallelo degli stessi messaggi da parte di diversi lettori (ad esempio writer + cache) con RabbitMQ è clona le code . Quindi una scalabilità più elevata potrebbe arrivare a un prezzo più alto.

    
risposta data 22.09.2018 - 20:16
fonte
11

La mia ipotesi è che devi esplorare con maggiore attenzione un approccio che hai respinto

  • Accodare gli eventi sul nostro server

Il mio suggerimento sarebbe di iniziare a leggere i vari articoli pubblicati sull'architettura LMAX . Sono riusciti a fare lavori di batching ad alto volume per il loro caso d'uso, e potrebbe essere possibile rendere i tuoi trade off più simili ai loro.

Inoltre, potresti voler vedere se riesci a togliere le letture - idealmente ti piacerebbe essere in grado di ridimensionarle indipendentemente dalle scritture. Ciò potrebbe significare guardare in CQRS (command query responsibility segregation).

The server can restart while events are enqueued causing us to lose the enqueued events.

In un sistema distribuito, penso che tu possa essere abbastanza sicuro che i messaggi si perderanno. Potresti essere in grado di mitigare un po 'l'impatto di ciò considerando le barriere della sequenza (ad esempio, assicurandoti che la scrittura sull'archivio durevole avvenga prima che l'evento sia condiviso al di fuori del sistema).

  • Utilizza più database, ciascuno dei quali salva una parte dei messaggi con un server DB-coordinator centrale per gestirli

Forse - Sarebbe più probabile che guardi i confini della tua attività per vedere se ci sono luoghi naturali in cui tagliare i dati.

There are cases where losing data is an acceptable tradeoff?

Bene, suppongo che ci potrebbe essere, ma non è dove stavo andando. Il punto è che il design dovrebbe aver incorporato in esso la robustezza necessaria per progredire di fronte alla perdita di messaggi.

Ciò che spesso appare è un modello basato su pull con notifiche. Il provider scrive i messaggi in un negozio durevole ordinato. Il consumatore attira i messaggi dal negozio, monitorando il proprio high water mark. Le notifiche push vengono utilizzate come dispositivi che riducono la latenza, ma se la notifica viene persa, il messaggio viene comunque recuperato (eventualmente) perché il consumatore sta eseguendo una pianificazione regolare (con la differenza che se la notifica viene ricevuta, il pull si verifica prima) ).

Vedi Messaggistica affidabile senza transazioni distribuite, di Udi Dahan (già citato da Andy ) e Polyglot Data di Greg Young.

    
risposta data 22.09.2018 - 14:21
fonte
6

Se comprendo correttamente il flusso corrente è:

  1. Ricevi ed eventi (presumo tramite HTTP?)
  2. Richiedi una connessione dal pool.
  3. Inserisci l'evento nel DB
  4. Rilascia la connessione al pool.

Se è così, penso che la prima modifica al design sarebbe quella di smettere di avere il codice di gestione uniforme per restituire le connessioni al pool su ogni evento. Creare invece un pool di thread / processi di inserimento che sia 1 a 1 con il numero di connessioni DB. Questi avranno ciascuno una connessione DB dedicata.

Usando un qualche tipo di coda concorrente, si hanno questi thread che estraggono i messaggi dalla coda concorrente e li inseriscono. In teoria, non hanno mai bisogno di restituire la connessione al pool o di richiederne uno nuovo, ma potrebbe essere necessario creare una gestione nel caso in cui la connessione vada male. Potrebbe essere più semplice uccidere il thread / processo e avviarne uno nuovo.

Questo dovrebbe eliminare efficacemente il sovraccarico del pool di connessioni. Ovviamente, sarà necessario essere in grado di effettuare push di almeno 1000 eventi / connessioni al secondo su ciascuna connessione. Potresti provare diversi numeri di connessioni poiché avere 500 connessioni che funzionano sulle stesse tabelle potrebbe creare una contesa sul DB, ma questa è una domanda completamente diversa. Un'altra cosa da considerare è l'uso di inserimenti batch, cioè ogni thread tira un numero di messaggi e li inserisce tutti in una volta. Inoltre, evita che più connessioni tentino di aggiornare le stesse righe.

    
risposta data 24.09.2018 - 16:39
fonte
5

Ipotesi

Suppongo che il carico che descrivi sia costante, poiché è lo scenario più difficile da risolvere.

Inoltre, assumerò che tu abbia un modo per eseguire carichi di lavoro con trigger e di lunga durata al di fuori del processo della tua applicazione web.

Soluzione

Supponendo di aver identificato correttamente il collo di bottiglia - latenza tra il processo e il database Postgres - questo è il problema principale da risolvere. La soluzione deve tenere conto del vincolo di coerenza con altri clienti che desiderano leggere gli eventi non appena possibile dopo la loro ricezione.

Per risolvere il problema della latenza, è necessario lavorare in un modo che riduca al minimo la quantità di latenza sostenuta per evento da memorizzare. Questa è la cosa chiave che devi realizzare se non sei disposto o in grado di cambiare l'hardware . Dato che sei su servizi PaaS e non hai alcun controllo su hardware o rete, l'unico modo per ridurre la latenza per evento sarà con una sorta di scrittura in batch di eventi.

Sarà necessario memorizzare una coda di eventi localmente che viene scaricata e scritta periodicamente sul tuo db, una volta raggiunta una determinata dimensione o dopo un intervallo di tempo trascorso. Un processo dovrà monitorare questa coda per attivare lo svuotamento del negozio. Dovrebbero esserci molti esempi su come gestire una coda concorrente che viene periodicamente svuotata nella lingua scelta: Qui è un esempio in C # , dal popolare sinking periodico della libreria di registrazione di Serilog.

Questa risposta SO descrive il modo più rapido per svuotare i dati in Postgres - anche se richiederebbe al tuo batch di memorizzare la coda disco, e probabilmente c'è un problema da risolvere lì quando il tuo disco scompare dopo il riavvio di Heroku.

Constraint

Un'altra risposta ha già menzionato CQRS e questo è l'approccio corretto per risolvere il vincolo. Vuoi idratare i modelli letti mentre viene elaborato ogni evento: un schema Mediatore può aiutare a incapsulare un evento e a distribuirlo a più gestori in corso. Pertanto, un gestore può aggiungere l'evento al modello di lettura che è in memoria che i client possono interrogare e un altro gestore può essere responsabile della messa in coda dell'evento per la sua eventuale scrittura in batch.

Il vantaggio principale di CQRS è il disaccoppiamento dei tuoi modelli di lettura e scrittura concettuali - che è un modo elegante per dire di scrivere in un modello e di leggere un altro modello completamente diverso. Per ottenere vantaggi di scalabilità da CQRS, in genere, si desidera garantire che ciascun modello sia archiviato separatamente in modo ottimale per i propri modelli di utilizzo. In questo caso possiamo utilizzare un modello di lettura aggregato, ad esempio una cache Redis o semplicemente in-memory, per garantire che le nostre letture siano rapide e coerenti, mentre continuiamo a utilizzare il nostro database transazionale per scrivere i nostri dati.

    
risposta data 24.09.2018 - 12:17
fonte
3

Events come in faster than the DB connection pool can handle

Questo è un problema se ogni processo ha bisogno di una connessione al database. Il sistema dovrebbe essere progettato in modo da avere un pool di lavoratori in cui ogni lavoratore ha bisogno di una sola connessione al database e ogni lavoratore può elaborare più eventi.

La coda messaggi può essere utilizzata con tale progettazione, è necessario che i produttori di messaggi trasmettano gli eventi alla coda dei messaggi e che i lavoratori (consumatori) elaborino i messaggi dalla coda.

Other clients might want to read events concurrently

Questo vincolo è possibile solo se gli eventi sono memorizzati nel database senza alcuna elaborazione (eventi non elaborati). Se gli eventi vengono elaborati prima di essere archiviati nel database, l'unico modo per ottenere gli eventi proviene dal database.

Se i client vogliono solo eseguire query sugli eventi non elaborati, suggerirei di utilizzare il motore di ricerca come Elastic Search. Puoi anche ottenere l'API di query / ricerca gratuitamente.

Dato che sembra che interrogare gli eventi prima che siano salvati nel database è importante per te, una soluzione semplice come la ricerca elastica dovrebbe funzionare. In pratica, si limitano a memorizzare tutti gli eventi al suo interno e non si duplicano gli stessi dati copiandoli nel database.

Scalare la ricerca elastica è facile, ma anche con la configurazione di base è abbastanza performante.

Quando hai bisogno di elaborazione, il tuo processo può ottenere gli eventi da ES, elaborarli e memorizzarli nel database. Non so quale sia il livello di prestazioni richiesto da questa elaborazione, ma sarebbe completamente separato dall'interrogare gli eventi di ES. Non dovresti avere comunque un problema di connessione, dato che puoi avere un numero fisso di worker e ognuno con una connessione al database.

    
risposta data 24.09.2018 - 11:53
fonte
2

1k o 2k eventi (5 KB) al secondo non è tanto per un database se ha uno schema e un motore di archiviazione appropriati. Come suggerito da @eddyce, un master con uno o più slave può separare le query di lettura dall'impegno delle scritture. L'utilizzo di un numero inferiore di connessioni DB offre un throughput complessivo migliore.

Other clients might want to read events concurrently

Per queste richieste, dovrebbero anche leggere dal master db come ci sarebbe il ritardo di replica per gli slave di lettura.

Ho usato (Percona) MySQL con il motore TokuDB per scritture a volume molto alto. C'è anche il motore MyRocks basato su LSMtrees che è buono per i carichi di scrittura. Per entrambi questi motori e probabilmente anche PostgreSQL ci sono impostazioni per l'isolamento della transazione e il comportamento di sincronizzazione del commit che può aumentare drasticamente la capacità di scrittura. In passato abbiamo accettato fino a 1s di dati persi che sono stati segnalati al client db come commesso. In altri casi c'erano SSD con batteria tampone per evitare perdite.

Amazon RDS Aurora nel gusto MySQL ha una velocità di scrittura 6 volte superiore con replica a costo zero (simile agli slave che condividono un filesystem con il master). L'aroma di Aurora PostgreSQL ha anche un diverso meccanismo di replica avanzato.

    
risposta data 08.10.2018 - 01:22
fonte
1

Mi piacerebbe rilasciare heroku tutti insieme, vale a dire, farei cadere un approccio centralizzato: più scritture che fanno il picco della connessione massima al pool è uno dei motivi principali per cui i cluster db sono stati inventati, principalmente perché non lo fai caricare la scrittura db (s) con richieste di lettura che possono essere eseguite da altri db nel cluster, proverei con una topologia master-slave, inoltre - come qualcuno ha già menzionato, avere la propria installazione di db renderebbe possibile sintonizzare l'intero sistema per assicurarsi che il tempo di propagazione delle query sia gestito correttamente.

Buona fortuna

    
risposta data 25.09.2018 - 00:45
fonte

Leggi altre domande sui tag