Pattern produttore-consumatore parzialmente parallelo con stato interno

6

Ho bisogno di implementare uno schema produttore-consumatore per leggere, elaborare e salvare valori elettrici. Devo implementarlo in C # .NET 4.6.1.

Cerco di descriverlo in modo dettagliato, in modo che non ci siano speranze di equivoco sull'obiettivo che voglio raggiungere. Ho un'idea su come farlo, ma potrebbe non essere il modo migliore.

Mi piacerebbe sapere cosa pensi della mia idea e come suggeriresti di risolverla.

La situazione

I valori vengono raccolti da un altro programma da diversi data logger. Questi valori sono memorizzati in file per ciascun data logger. Il software che sto scrivendo ha un file di configurazione, che specifica la directory dei file di input, dove generare i risultati, quale formato viene utilizzato per l'input, quale formato viene utilizzato per l'output, ecc. È importante che per ogni data logger esiste una definizione su come elaborare ogni valore (elettrico).

Il numero di file, le loro dimensioni e il numero di data logger possono variare notevolmente. Assumerei circa 1-200 data logger, possibilmente di più.

Gli obiettivi

Voglio avere l'elaborazione parallela per ogni file del data logger con I / O simultaneo. Ciò significa che mentre il programma legge nuovi file, il contenuto dei file precedentemente letti viene elaborato e i risultati dei calcoli precedenti vengono scritti in file. L'elaborazione dovrebbe avvenire in parallelo tra i data logger.

È importante che l'elaborazione dei valori non possa essere parallelizzata all'interno. I valori di un data logger vengono elaborati in sequenza (cronologicamente). Per ottimizzare il throughput e limitare l'utilizzo della memoria, è necessario eseguire il buffering tra I / O ed elaborazione, eventualmente anche tra le fasi di elaborazione.

Un piccolo schizzo

+----------+               +----------------+               +-----------+ 
|Read files|---->Queue---->|Process values  |---->Queue---->|Write files|
+----------+         |     |of data logger 1|               +-----------+ 
   (Task)            |     +----------------+                  (Task) 
                     |     +----------------+
                     |---->|Process values  |
                     |     |of data logger 2|
                     |     +----------------+
                     ...       (n Tasks) 

L'idea

La mia idea è di usare il flusso di dati Task Parallel Library (TPL). Che potrebbe assomigliare a questo.

+--------------+     +--------------+     +----------------+               +-----------+ 
|TransformBlock|     |  BufferBlock |     | TransformBlock |               |ActionBlock|  
|  Read files  |---->|     Queue    |---->| Process values |---->Queue---->|Write files|
+--------------+     +--------------+     |of data logger 1|               +-----------+ 
   (1 Block)                        |     +----------------+                 (1 Block) 
                                    |     +----------------+
                 conditional linking|     | TransformBlock |
                                    |---->| Process values |
                                    |     |of data logger 2|
                                    |     +----------------+
                                    ...       (n Blocks) 

Dato che i dati letti non sono pensati per ogni TransformBlock io userei collegamento condizionale . Ogni TransformBlock che elabora i valori verrebbe generato in un ciclo a seconda di quanti data logger abbiamo, insieme a un'istanza della classe che contiene le funzioni di elaborazione.

È importante che ci siano diversi oggetti da elaborare, perché i calcoli hanno uno stato. Un esempio semplificato per uno stato potrebbe essere l'ultimo valore calcolato, se desidero sommare tutti i valori in entrata. Un altro potrebbe essere l'ultimo timestamp, quindi posso verificare che il nuovo valore sia cronologicamente corretto. Lo stato ovviamente esisterebbe per oggetto, quindi per blocco e data logger. Non dovrebbero essere necessarie serrature.

Il maxDegreeOfParallelism sarà sempre impostato su 1, perché l'elaborazione entro TransformBlocks deve essere sequenziale.

Il collegamento condizionale potrebbe avere un impatto sulle prestazioni, perché deve sempre verificare dove si trova il file. Se ci fosse un modo per scegliere direttamente un blocco a cui inviare un messaggio, questo potrebbe risparmiare un po 'di tempo. Ho scelto di non utilizzare un blocco per ogni data logger per leggere i file, perché più attività per I / O potrebbero ostacolare le prestazioni.

Cosa ne pensi della mia idea? Come si potrebbe migliorare? Prenderesti un approccio completamente diverso?

Devo essere onesto, ho già postato una domanda molto simile su Stack Overflow , ma sembra che non l'abbia spiega molto bene il mio problema.

    
posta John 24.03.2016 - 20:16
fonte

1 risposta

2

Ho pensato che questo e un modello di consumer-producer con Tasks and BlockingCollections dovrebbe essere la soluzione più semplice ed efficace.

+----------+                          +----------------+                           +-----------+ 
|   Task   |                          |      Task      |                           |   Task    |
|Read files|-----BlockingCollection-->|Process values  |----BlockingCollection---->|Write files|
+----------+   |                      |of data logger 1|  |                        +-----------+ 
               |                      +----------------+  |             
               |                      +----------------+  |
               |                      |      Task      |  |
               |-BlockingCollection-->|Process values  |--|
               |                      |of data logger 2|  |
               |                      +----------------+  |
                ...                       (n Tasks)        ...

Le BlockingCollections vengono create per prime e passate nel costruttore dei file di lettura dell'attività. Il BlockingCollection necessario per un task di elaborazione dati viene anche passato all'oggetto che rappresenta l'unità di elaborazione per un data logger. Ogni Task per l'elaborazione dei dati esegue una funzione dell'oggetto corrispondente, che contiene anche lo stato interno.

L'attività di lettura dei file è in grado di distinguere tra i diversi file del data logger e può passare i dati al BlockingCollection corretto. A seconda della strategia utilizzata, sa già dove passare i dati opposti a TPL Dataflow con il collegamento condizionale.

    
risposta data 25.03.2016 - 15:11
fonte

Leggi altre domande sui tag