Ho un'implementazione Producer / Consumer in cui il numero di consumatori è configurabile (questa è una forma di limitazione configurabile). Il produttore e il consumatore sono partiti in questo modo:
var cts = new CancellationTokenSource();
var processCancelToken = cts.Token;
Task.Run(() => Parallel.Invoke(new ParallelOptions() { CancellationToken = processCancelToken }
, producer
, consumer
)
, processCancelToken);
L'azione del produttore è abbastanza semplice e popola un BlockingCollection con oggetti derivati da System.Threading.Tasks.Task (sì è possibile!). Ecco un esempio semplificato:
var pollingInterval = 30000;
var producer = new Action(() =>
{
Random rnd = new Random(DateTime.Now.Second);
while (!processCancelToken.IsCancellationRequested)
{
for (int ii = 0; ii < 10; ii++)
{
var r = rnd.Next(2, 15);
_mainQueue.Add(new Task(() =>
{
//this is a dummy task for illustrative purposes
Console.WriteLine(" Queued task starting, set to sleep {0} seconds, ID: {1}", r, Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(r*1000);
}));
Console.WriteLine(" Producer has added task to queue");
}
System.Threading.Thread.Sleep(pollingInterval);
}
Console.WriteLine("Exiting producer");
}
Per questo esempio crea un'attività anonima che dorme per un numero casuale di secondi tra 2 e 15. Nel codice reale questo produttore esegue il polling del database, estrae le entità dati che rappresentano gli elementi di lavoro, quindi le trasforma in attività eseguibili che vengono aggiunte alla collezione.
Ho quindi un'attività consumer che utilizza Parallel.For()
per avviare istanze n di un'azione anonima che quindi rimuove dalla raccolta una attività, quindi avvia e attende l'attività, quindi ripete:
var numberConsumerThreads = 3;
var consumer = new Action(() =>
{
Parallel.For(0, numberConsumerThreads, (x) =>
{
//this action should continue to dequeue work items until it is cancelled
while (!processCancelToken.IsCancellationRequested)
{
var dequeuedTask = _mainQueue.Take(processCancelToken);
Console.WriteLine(" Consumer #{0} has taken task from the queue", Thread.CurrentThread.ManagedThreadId);
dequeuedTask.Start();
while (!processCancelToken.IsCancellationRequested)
{
if (dequeuedTask.Wait(500))
break;
Console.WriteLine(" Consumer #{0} task wait elapsed", Thread.CurrentThread.ManagedThreadId);
}
}
Console.WriteLine("Exiting consumer #{0}", Thread.CurrentThread.ManagedThreadId);
});
}
La domanda: è un modo efficace per avviare e gestire un numero arbitrario di consumatori? O c'è un modo più efficiente di usare PLINQ dall'azione principale del consumatore che continua ad eseguire attività in coda, blocchi mentre non ce n'è, e può ancora essere cancellato usando processCancelToken
?
(Nota: processCancelToken
viene gestito separatamente per cancellare i token contenuti nelle attività in coda - sono cancellabili in modo indipendente.Tutto questo viene eseguito all'interno di un servizio Windows e processCancelToken
viene utilizzato per cancellare tutto se il servizio viene interrotto). / p>