Registrazione in modo asincrono - come dovrebbe essere fatto?

8

In molti dei servizi su cui lavoro ci sono molte registrazioni in corso. I servizi sono servizi WCF (principalmente) che utilizzano la classe .NET EventLogger.

Sono in procinto di migliorare le prestazioni di questi servizi, e ho pensato che la registrazione in modo asincrono avrebbe giovato alle prestazioni.

Non sono a conoscenza di cosa succede quando più thread richiedono il log, e se crea un collo di bottiglia, ma anche se non lo fa, continuo a pensare che non dovrebbe interferire con il processo in corso.

I miei pensieri sono che dovrei invocare lo stesso metodo di registrazione che chiamo ora, ma farlo utilizzando un nuovo thread, continuando con il processo effettivo.

Alcune domande al riguardo:

Va bene?

Ci sono aspetti negativi?

Dovrebbe essere fatto in modo diverso?

Forse è così veloce che non vale nemmeno la pena?

    
posta Mithir 20.05.2012 - 09:29
fonte

4 risposte

11

Il thread separato per l'operazione I \ O suona ragionevole.

Ad esempio, non sarebbe opportuno registrare i pulsanti che l'utente ha premuto nello stesso thread dell'interfaccia utente. Tale interfaccia utente si bloccherà casualmente e avrà performance percepita lenta

La soluzione è disaccoppiare l'evento dalla sua elaborazione.

Ecco un sacco di informazioni su Producer-Consumer Problem e Event Queue dal mondo dello sviluppo di giochi

Spesso esiste un codice come

///Never do this!!!
public void WriteLog_Like_Bastard(string msg)
{
    lock (_lockBecauseILoveThreadContention)
    {
        File.WriteAllText("c:\superApp.log", msg);
    }
}

Questo approccio porterà alla contesa del filo. Tutti i thread di elaborazione combatteranno per essere in grado di ottenere il blocco e scrivere sullo stesso file contemporaneamente.

Alcuni potrebbero provare a rimuovere i lucchetti.

public void Log_Like_Dumbass(string msg)
{
      try 
      {  File.Append("c:\superApp.log", msg); }
        catch (Exception ex) 
        {
            MessageBox.Show("Log file may be locked by other process...")
        }
      }    
}

Non è possibile prevedere il risultato se 2 thread inseriranno il metodo nello stesso momento.

Quindi alla fine gli sviluppatori disabiliteranno la registrazione ...

È possibile correggere?

Sì.

Diciamo che abbiamo un'interfaccia:

 public interface ILogger
 {
    void Debug(string message);
    // ... etc
    void Fatal(string message);
 }

Invece di aspettare il blocco e eseguire l'operazione di blocco dei file ogni volta che viene chiamato ILogger , aggiungeremo LogMessage alla coda Penging Messages e torneremo a più importanti cose:

public class AsyncLogger : ILogger
{
    private readonly BlockingCollection<LogMessage> _pendingMessages;
    private readonly Type _loggerFor;
    private readonly IThreadAdapter _threadAdapter;

    public AsyncLogger(BlockingCollection<LogMessage> pendingMessages, Type loggerFor, IThreadAdapter threadAdapter)
    {
        _pendingMessages = pendingMessages;
        _loggerFor = loggerFor;
        _threadAdapter = threadAdapter;
    }

    public void Debug(string message)
    {
        Push(LoggingLevel.Debug, message);
    }

    public void Fatal(string message)
    {
        Push(LoggingLevel.Fatal, message);
    }

    private void Push(LoggingLevel importance, string message)
    {
        // since we do not know when our log entry will be written to disk, remember current time
        var timestamp = DateTime.Now;
        var threadId = _threadAdapter.GetCurrentThreadId();

        // adds message to the queue in lock-free manner and immediately returns control to caller
        _pendingMessages.Add(LogMessage.Create(timestamp, importance, message, _loggerFor, threadId));
    }
}

Abbiamo terminato con questo semplice Logger asincrono .

Il passo successivo è elaborare i messaggi in arrivo.

Per semplicità, inizia il nuovo Discussione e attendi fino all'esaurimento dell'applicazione o Logger asincrono aggiungerà un nuovo messaggio alla coda in attesa .

public class LoggingQueueDispatcher : IQueueDispatcher
{
    private readonly BlockingCollection<LogMessage> _pendingMessages;
    private readonly IEnumerable<ILogListener> _listeners;
    private readonly IThreadAdapter _threadAdapter;
    private readonly ILogger _logger;
    private Thread _dispatcherThread;

    public LoggingQueueDispatcher(BlockingCollection<LogMessage> pendingMessages, IEnumerable<ILogListener> listeners, IThreadAdapter threadAdapter, ILogger logger)
    {
        _pendingMessages = pendingMessages;
        _listeners = listeners;
        _threadAdapter = threadAdapter;
        _logger = logger;
    }

    public void Start()
    {
        //  Here I use 'new' operator, only to simplify example. Should be using interface  '_threadAdapter.CreateBackgroundThread' to allow unit testing
        Thread thread = new Thread(MessageLoop);
        thread.Name = "LoggingQueueDispatcher Thread";
        thread.IsBackground = true;

        thread.Start();
        _logger.Debug("Asked to start log message Dispatcher ");

        _dispatcherThread = thread;
    }

    public bool WaitForCompletion(TimeSpan timeout)
    {
        return _dispatcherThread.Join(timeout);
    }

    private void MessageLoop()
    {
        _logger.Debug("Entering dispatcher message loop...");
        var cancellationToken = new CancellationTokenSource();
        LogMessage message;

        while (_pendingMessages.TryTake(out message, Timeout.Infinite, cancellationToken.Token))
        {
            // !!!!! Now it is safe to use File.AppendAllText("c:\my.log") without ever using lock or forcing important threads to wait.
            // this is example, do not use in production
            foreach (var listener in _listeners)
            {
                listener.Log(message);
            }
        }

    }
}

Sto passando a una catena di ascoltatori personalizzati. Probabilmente potresti solo inviare il framework di registrazione delle chiamate ( log4net , ecc ...)

Ecco il resto del codice:

public enum LoggingLevel
{
    Debug,
    // ... etc
    Fatal,
}


public class LogMessage
{
    public DateTime Timestamp { get; private set; }
    public LoggingLevel Importance { get; private set; }
    public string Message { get; private set; }
    public Type Source { get; private set; }
    public int ThreadId { get; private set; }

    private LogMessage(DateTime timestamp, LoggingLevel importance, string message, Type source, int threadId)
    {
        Timestamp = timestamp;
        Message = message;
        Source = source;
        ThreadId = threadId;
        Importance = importance;
    }

    public static LogMessage Create(DateTime timestamp, LoggingLevel importance, string message, Type source, int threadId)
    {
        return  new LogMessage(timestamp, importance, message, source, threadId);
    }

    public override string ToString()
    {
        return string.Format("{0}  [TID:{4}] {1:h:mm:ss} ({2})\t{3}", Importance, Timestamp, Source, Message, ThreadId);
    }
}

public class LoggerFactory : ILoggerFactory
{
    private readonly BlockingCollection<LogMessage> _pendingMessages;
    private readonly IThreadAdapter _threadAdapter;

    private readonly ConcurrentDictionary<Type, ILogger> _loggersCache = new ConcurrentDictionary<Type, ILogger>();


    public LoggerFactory(BlockingCollection<LogMessage> pendingMessages, IThreadAdapter threadAdapter)
    {
        _pendingMessages = pendingMessages;
        _threadAdapter = threadAdapter;
    }

    public ILogger For(Type loggerFor)
    {
        return _loggersCache.GetOrAdd(loggerFor, new AsyncLogger(_pendingMessages, loggerFor, _threadAdapter));
    }
}

public class ThreadAdapter : IThreadAdapter
{
    public int GetCurrentThreadId()
    {
        return Thread.CurrentThread.ManagedThreadId;
    }
}

public class ConsoleLogListener : ILogListener
{
    public void Log(LogMessage message)
    {
        Console.WriteLine(message.ToString());
        Debug.WriteLine(message.ToString());
    }
}

public class SimpleTextFileLogger : ILogListener
{
    private readonly IFileSystem _fileSystem;
    private readonly string _userRoamingPath;
    private readonly string _logFileName;
    private FileStream _fileStream;

    public SimpleTextFileLogger(IFileSystem fileSystem, string userRoamingPath, string logFileName)
    {
        _fileSystem = fileSystem;
        _userRoamingPath = userRoamingPath;
        _logFileName = logFileName;
    }

    public void Start()
    {
        _fileStream = new FileStream(_fileSystem.Path.Combine(_userRoamingPath, _logFileName), FileMode.Append);
    }

    public void Stop()
    {
        if (_fileStream != null)
        {
            _fileStream.Dispose();
        }
    }

    public void Log(LogMessage message)
    {
        var bytes = Encoding.UTF8.GetBytes(message.ToString() + Environment.NewLine);
        _fileStream.Write(bytes, 0, bytes.Length);
    }
}

public interface ILoggerFactory
{
    ILogger For(Type loggerFor);
}

public interface ILogListener
{
    void Log(LogMessage message);
}

public interface IThreadAdapter
{
    int GetCurrentThreadId();
}

public interface IQueueDispatcher
{
    void Start();
}

Punto di ingresso:

public static class Program
{
    public static void Main()
    {
        Debug.WriteLine("[Program] Entering Main ...");

        var pendingLogQueue = new BlockingCollection<LogMessage>();


        var threadAdapter = new ThreadAdapter();
        var loggerFactory = new LoggerFactory(pendingLogQueue, threadAdapter);


        var fileSystem = new FileSystem();
        var userRoamingPath = GetUserDataDirectory(fileSystem);

        var simpleTextFileLogger = new SimpleTextFileLogger(fileSystem, userRoamingPath, "log.txt");
        simpleTextFileLogger.Start();
        ILogListener consoleListener = new ConsoleLogListener();
        ILogListener[] listeners = new [] { simpleTextFileLogger , consoleListener};

        var loggingQueueDispatcher = new LoggingQueueDispatcher(pendingLogQueue, listeners, threadAdapter, loggerFactory.For(typeof(LoggingQueueDispatcher)));
        loggingQueueDispatcher.Start();

        var logger = loggerFactory.For(typeof(Console));

        string line;
        while ((line = Console.ReadLine()) != "exit")
        {
            logger.Debug("you have entered: " + line);
        }

        logger.Fatal("Exiting...");

        Debug.WriteLine("[Program] pending LogQueue will be stopped now...");
        pendingLogQueue.CompleteAdding();
        var logQueueCompleted = loggingQueueDispatcher.WaitForCompletion(TimeSpan.FromSeconds(5));

        simpleTextFileLogger.Stop();
        Debug.WriteLine("[Program] Exiting... logQueueCompleted: " + logQueueCompleted);

    }



    private static string GetUserDataDirectory(FileSystem fileSystem)
    {
        var roamingDirectory = Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData);
        var userDataDirectory = fileSystem.Path.Combine(roamingDirectory, "Async Logging Sample");
        if (!fileSystem.Directory.Exists(userDataDirectory))
            fileSystem.Directory.CreateDirectory(userDataDirectory);
        return userDataDirectory;
    }
}
    
risposta data 25.09.2015 - 23:40
fonte
1

I fattori chiave da considerare sono la necessità di affidabilità nei file di log e la necessità di prestazioni. Fare riferimento ai lati negativi. Penso che questa sia una grande strategia per le situazioni ad alte prestazioni.

Va bene - sì

Ci sono degli svantaggi - sì - a seconda della criticità della registrazione e dell'implementazione potrebbe verificarsi una delle seguenti condizioni: log scritti fuori sequenza, azioni del thread di log non completate prima che le azioni dell'evento siano completate. (Immagina uno scenario in cui ti registri "iniziando a connetterti a DB" e poi blocchi il server, l'evento di log potrebbe non essere mai scritto anche se l'evento si è verificato (!))

Dovrebbe essere fatto in un modo diverso - potresti voler guardare al modello Disruptor dato che è quasi l'ideale per questo scenario

Forse è così veloce che non ne vale nemmeno la pena - non sono d'accordo. Se la tua è una logica di "applicazione", e l'unica cosa che fai è scrivere i registri dell'attività - allora sarai di latitudine inferiore di ordini di grandezza scaricando la registrazione. Se tuttavia si fa affidamento su una chiamata SQL DB 5sec da restituire prima di registrare le istruzioni 1-2, i vantaggi sono misti.

    
risposta data 22.05.2012 - 00:38
fonte
1

Penso che la registrazione sia generalmente un'operazione sincrona dalla sua natura. Vuoi registrare le cose se accadono o se non dipendono dalla tua logica, quindi per poter registrare qualcosa, questa cosa deve essere valutata per prima.

Detto questo, è possibile migliorare le prestazioni dell'applicazione registrando nella cache i registri e quindi creando un thread e salvandoli su file quando si ha un'operazione con CPU.

Devi identificare i tuoi checkpoint in modo intelligente in modo da non perdere le tue importanti informazioni di registrazione durante quel periodo di cache.

Se si desidera ottenere un aumento delle prestazioni nei thread, è necessario bilanciare le operazioni di I / O e le operazioni della CPU.

Se crei 10 thread che fanno tutti IO, non otterrai un aumento delle prestazioni.

    
risposta data 22.05.2012 - 03:00
fonte
0

La registrazione asincrona è l'unico modo per andare se hai bisogno di bassa latenza nei thread di registrazione. Il modo in cui questo viene fatto per le massime prestazioni è attraverso il modello di disturbo per la comunicazione di thread senza vincoli e senza spazzatura. Ora, se si desidera consentire a più thread di accedere contemporaneamente allo stesso file, è necessario sincronizzare le chiamate di registro e pagare il prezzo nel conflitto di blocco OPPURE utilizzare un multiplexor senza blocco. Ad esempio, CoralQueue fornisce una semplice coda di multiplexing come descritto di seguito:

Puoidareun'occhiataa CoralLog che utilizza queste strategie per la registrazione asincrona.

Disclaimer: sono uno degli sviluppatori di CoralQueue e CoralLog.

    
risposta data 14.08.2014 - 22:47
fonte

Leggi altre domande sui tag