Quale costrutto si usa per garantire che 100 attività siano eseguite in parallelo

5

Stavo provando a creare un test di integrazione per il mio servizio in cui 100 client si connettono, eseguono l'accesso, inviano richieste e registrano tutte le risposte per un certo periodo di tempo configurabile.

Sono stato creato una classe per il client usando socket asincroni e funziona perfettamente. Li ho avviati tutti utilizzando Task e Task.Factory, inviato il login e inviato ricevuto ogni volta che ho ricevuto i dati, fino al tempo scaduto e poi ho chiamato shutdown su di essi.

Sembra che non siano mai stati realmente eseguiti in parallelo. A volte potrei ottenere una corsa in una volta, a volte un po 'di più. Presumo che l'utilità di pianificazione li stia eseguendo quando sembra in forma piuttosto che tutti in una volta.

Ora capisco che non posso veramente avere 100 thread in esecuzione simultanea, ma voglio garantire che tutti i 100 siano avviati e che il sistema operativo stia cambiando il contesto e tenti di eseguirli tutti.

Alla fine, voglio simulare un gran numero di client connessi al mio servizio ricevendo tutti un flusso di dati.

Quale costrutto devo usare se Task non funziona?

Tentativo corrente:

using System;
using System.Collections.Generic;
using System.Configuration;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace IntegrationTests
{
    class Program
    {
        static void Main(string[] args)
        {
            string       server            = ConfigurationManager.AppSettings["server"];
            int          port              = int.Parse(ConfigurationManager.AppSettings["port"]);
            int          numClients        = int.Parse(ConfigurationManager.AppSettings["numberOfClients"]);
            TimeSpan     clientLifetime    = TimeSpan.Parse(ConfigurationManager.AppSettings["clientLifetime"]);
            TimeSpan     timeout           = TimeSpan.Parse(ConfigurationManager.AppSettings["timeout"]);
            TimeSpan     reconnectInterval = TimeSpan.Parse(ConfigurationManager.AppSettings["reconnectInterval"]);
            List<string> clientIds         = ConfigurationManager.GetSection("clientIds") as List<string>;

            try
            {
                // SNIP configure logging

                // Create the specified number of clients, to carry out test operations, each on their own threads
                Task[] tasks = new Task[numClients];
                for(int count = 0; count < numClients; ++count)
                {
                    var index = count;
                    tasks[count] = Task.Factory.StartNew(() =>
                        {
                            try
                            {
                                // Reuse client Ids, if there are more clients then clientIds.
                                // Keep in mind that tasks are not necessarily started in the order they were created in this loop.
                                // We may see client id 1 be assigned client id 2 if another client was started before it, but we
                                // are using all clientIds
                                string clientId = null;
                                if (numClients < clientIds.Count)
                                {
                                    clientId = clientIds[index];
                                }
                                else
                                {
                                    clientId = clientIds[index % clientIds.Count];
                                }

                                // Create the actual client
                                Client client = new Client(server, port, clientId, timeout, reconnectInterval);
                                client.Startup();

                                // Will make an sync request issue a recv.
                                // Everytime we get a reponse, it will be logged and another recv will be posted.
                                // This will continue until shutdown is called
                                client.MakeRequest(symbol);

                                System.Threading.Thread.Sleep(clientLifetime);

                                client.Shutdown();
                            }
                            catch(Exception e)
                            {
                                // SNIP - Log it
                            }
                        });
                }
                Task.WaitAll(tasks);
            }
            catch (Exception e)
            {
                // SNIP - Log it
            }
        }
    }
}
    
posta Christopher Pisz 15.12.2017 - 21:36
fonte

2 risposte

6

Attività e discussioni esistono per scopi diversi. Le attività sono pensate per essere brevi cose che devono essere eseguite in background. I thread rappresentano una risorsa operativa per l'esecuzione simultanea.

Internamente, TaskManager utilizza un pool di thread in modo che possa riutilizzare i thread per elaborare più attività. I thread sono costosi da configurare e abbattere in modo che non funzionino correttamente allo scopo di creare Task. Sebbene tu possa influenzare il numero di thread disponibili per il task manager, è comunque responsabile di distribuire il lavoro ai thread.

Garanzia del numero X di client simultanei

L'unico modo per garantirlo è utilizzare Thread anziché Task . Se dovessi ristrutturare un po 'il tuo codice potresti gestire i tuoi client simultanei in questo modo:

using System;
using System.Collections.Generic;
using System.Configuration;
using System.Linq;
using System.Text;
using System.Threading;

namespace IntegrationTests
{
    private static string server;
    private static int port;
    private static TimeSpan clientLifetime;
    private static TimeSpan timeout;
    private static TimeSpan reconnectInterval;
    private static List<string> clientIds;
    private static Barrier barrier;

    class Program
    {
        static void Main(string[] args)
        {
            int          numClients        = int.Parse(ConfigurationManager.AppSettings["numberOfClients"]);
            server            = ConfigurationManager.AppSettings["server"];
            port              = int.Parse(ConfigurationManager.AppSettings["port"]);
            clientLifetime    = TimeSpan.Parse(ConfigurationManager.AppSettings["clientLifetime"]);
            timeout           = TimeSpan.Parse(ConfigurationManager.AppSettings["timeout"]);
            reconnectInterval = TimeSpan.Parse(ConfigurationManager.AppSettings["reconnectInterval"]);
            clientIds         = ConfigurationManager.GetSection("clientIds") as List<string>;
            barrier           = new Barrier(numClients + 1);

            try
            {
                // SNIP configure logging

                // Create the specified number of clients, to carry out test operations, each on their own threads
                Thread[] threads= new Thread[numClients];
                for(int count = 0; count < numClients; ++count)
                {
                    var index = count;
                    threads[count] = new Thread();
                    threads[count].Name = $"Client {count}"; // for debugging
                    threads[count].Start(RunClient);
                }

                // We loose the convenience of awaiting all tasks,
                // but use a thread barrier to block this thread until all the others are done.
                barrier.SignalAndWait();
            }
            catch (Exception e)
            {
                // SNIP - Log it
            }
        }

        private void RunClient()
        {
            try
            {
                // Reuse client Ids, if there are more clients then clientIds.
                // Keep in mind that tasks are not necessarily started in the order they were created in this loop.
                // We may see client id 1 be assigned client id 2 if another client was started before it, but we
                // are using all clientIds
                string clientId = null;
                if (numClients < clientIds.Count)
                {
                    clientId = clientIds[index];
                }
                else
                {
                    clientId = clientIds[index % clientIds.Count];
                }

                // Create the actual client
                Client client = new Client(server, port, clientId, timeout, reconnectInterval);
                client.Startup();

                // Will make an sync request issue a recv.
                // Everytime we get a reponse, it will be logged and another recv will be posted.
                // This will continue until shutdown is called
                client.MakeRequest(symbol);

                System.Threading.Thread.Sleep(clientLifetime);

                client.Shutdown();
            }
            catch(Exception e)
            {
                // SNIP - Log it
            }
            finally
            {
                barrier.SignalAndWait();
            }
        }
    }
}
    
risposta data 15.12.2017 - 23:22
fonte
1

Non penso che ci sia un problema con il test.

Ho utilizzato un codice simile per il test di carico (di base) e ho visto oltre 100 attività simultanee.

Direi che c'è un problema nel modo in cui stai loggando. Stai semplicemente misurando il numero di connessioni simultanee che il tuo server può gestire?

Ad esempio, il codice seguente conterà fino a 1000.

Tuttavia, notare la differenza se sostituiamo Task.Delay con Thread.Sleep. Questo interrompe l'app perché più di una attività viene eseguita sullo stesso thread.

Ora, se anche cambia l'attività. Aggiungi a:

tasks.Add(Task.Factory.StartNew(async () => Work(),TaskCreationOptions.LongRunning));

Il codice funziona di nuovo, poiché è in grado di creare le nuove attività su nuovi thread

using System;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Collections.Generic;

namespace UnitTestProject1
{
    [TestClass]
    public class UnitTest1
    {
        volatile int count = 0;
        [TestMethod]
        public async Task TestMethod1()
        {
            var tasks = new List<Task>();
            for(int i = 0;i<1000;i++)
            {
                tasks.Add(Work());
            }
            await Task.WhenAll(tasks.ToArray());
            Debug.WriteLine("finished");
        }

        async Task Work()
        {
            count++;
            Debug.WriteLine(count);
            await Task.Delay(10000);
            Debug.WriteLine(count);
            count--;
        }
    }
}
    
risposta data 16.12.2017 - 11:30
fonte

Leggi altre domande sui tag