Sincronizzazione thread per più thread che accedono a un flusso di messaggi

1

Il mio progetto ha un singolo flusso di messaggi (che comunica con un dispositivo esterno) a cui si accede tramite più thread client. Il flusso di lavoro è:

  • Esiste un thread (il "thread di messaggistica") che legge il flusso di messaggi ed è in grado di inviare messaggi ricevuti ai client. Funziona attendendo una coda sicura per i thread che il flusso dei messaggi espone.
  • I messaggi possono essere inviati sullo stream da qualsiasi thread (lo stream è thread-safe e gestisce l'invio / ricezione simultanea)
  • Un thread client invia una richiesta attraverso il flusso dei messaggi e fornisce una funzione di callback.
  • Il thread client attende risposte. Potrebbero esserci più risposte. Il thread di messaggistica sa come abbinare un messaggio ricevuto a un thread client, in base alla richiesta inviata dal thread client. Il thread di messaggistica richiama la funzione di callback corretta per ciascun messaggio ricevuto.
  • Dopo aver inviato la richiesta, il thread client deve bloccarsi fino a quando tutte le risposte sono state elaborate. Questo è normalmente indicato dalla funzione di callback che restituisce uno stato particolare, ma deve anche essere possibile sbloccare un thread client e terminare l'interazione dal server di messaggistica che riceve un particolare messaggio che riconoscerà o dall'arresto dell'applicazione.
  • Di tanto in tanto il servizio di messaggistica potrebbe non essere disponibile (il thread di messaggistica lo saprà); quindi i thread client devono essere in grado di aspettare un certo tempo per vedere se il servizio di messaggistica è diventato disponibile.

Queste sono le mie esigenze ma non riesco a capire come codificare la logica per i thread client "in entrata" e in attesa; e in che modo i thread client devono comunicare con il thread di messaggistica. Quali primitive di sincronizzazione utilizzare e così via.

Sto codificando in C ++ con le librerie Poco, e posso usare tutte le solite primitive (mutex, evento, semaforo, variabile di condizione ecc.) e costrutti di livello superiore come una coda di notifica, un centro di notifica e un dispatcher di eventi .

    
posta M.M 20.08.2015 - 12:13
fonte

1 risposta

2

Ci sono diversi modi per farlo, ma se sei incline ad attaccare con POCO, ti consigliamo di consultare macchina.io ( OSP porzione) Implementazione WebEvent - è essenzialmente un framework di pubblicazione / sub-messaggistica. C'è molto di più di quello che ti serve ma è relativamente semplice e architettonicamente dovresti essere in grado di adattarlo rapidamente alle tue esigenze. L'ho usato in produzione per molti anni e funziona bene; verrà anche convertito in un modulo indipendente da OSP in Poco per una delle prossime versioni.

Il cliente può essere (1) un endpoint del web socket o (2) un osservatore in-process che può inviare (cioè pubblicare eventi) dati e / o iscriversi (cioè ricevere notifiche) a uno o più soggetti (argomenti). Probabilmente avrai bisogno di molti osservatori in-process e di un endpoint remoto.

Il framework funziona con la gestione di due thread:

  • Coda principale - responsabile per la spedizione degli eventi di sottoscrizione / annullamento sottoscrizione dei clienti.

  • Worker queue - responsabile per la distribuzione degli eventi di dati (messaggi).

Ogni coda viene gestita in una propria discussione e vi è uno schema di denominazione con notazione a punti per i nomi dei soggetti, vedere qui per i dettagli. Nota che la documentazione menziona solo i WebSocket ma la denominazione funziona esattamente allo stesso modo per gli osservatori in-process e potresti volere o avere bisogno di uno schema di denominazione diverso.

    
risposta data 26.08.2015 - 04:25
fonte

Leggi altre domande sui tag