HavenDV / H.Pipes

A simple, easy to use, strongly-typed, async wrapper around .NET named pipes.
MIT License
242 stars 26 forks source link

Memory Leak in H.Pipes Client’s WriteAsync Method #64

Closed mathiaspoedtsaxians closed 2 months ago

mathiaspoedtsaxians commented 2 months ago

We are using H.Pipes to connect the logging of various programs running on a single server to a single display with multiple tabs. This allows us to quickly monitor the status of all programs at a glance.

However, we’ve noticed that all programs are experiencing a slow but steady increase in memory usage. We’ve traced this issue back to the WriteAsync method of the H.Pipes client.

Here are the Server and Client wrappers that utilize all the H.Pipes logic:

public class VerlaPipesLogClient : IDisposable
{
    private string _pipeName = "";
    private readonly ILogger<VerlaPipesLogClient> _logger;
    private PipeClient<LogMessage> _client;

    public string ServiceId { get; private set; }

    public PipeClient<LogMessage> Client => _client;

    /// <summary>
    /// 
    /// </summary>
    /// <param name="logger"></param>
    /// <param name="formatter"></param>
    /// <param name="pipeName">Name of the logging pipe, Server with the same pipename will receive the logs</param>
    /// <param name="serviceId">Used to identify the application that has sent the log</param>
    /// <exception cref="ArgumentNullException"></exception>
    public VerlaPipesLogClient(ILogger<VerlaPipesLogClient> logger, IFormatter formatter, string pipeName, string serviceId)
    {
        if (_client != null && _client.IsConnected && !string.IsNullOrWhiteSpace(_pipeName))
            return;

        if (string.IsNullOrWhiteSpace(pipeName))
            throw new ArgumentNullException(nameof(pipeName));

        if (string.IsNullOrWhiteSpace(serviceId))
            throw new ArgumentNullException(nameof(serviceId));

        _logger = logger;
        ServiceId = serviceId;

        _pipeName = pipeName.Trim();
        _client = new PipeClient<LogMessage>(_pipeName, formatter: formatter);
        _client.Connected += async (o, args) => OnServerConnected(args);
        _client.Disconnected += (o, args) => OnServerDisconnected(args);
        _client.MessageReceived += (sender, args) => OnMessageReceived(args.Message);
        _client.ExceptionOccurred += (o, args) => OnExceptionOccurred(args.Exception);

        _client.ConnectAsync();
        try
        {
            _client.WriteAsync(new LogMessage
            {
                ServiceId = serviceId,
                MessageTitle = "Connection",
                MessageContent = $"Connected to pipe {pipeName}",
                Timestamp = DateTime.Now,
                LogLevel = (int)LogLevel.Debug
            });
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error while writing to PipeServer");
        }
    }

    private void OnServerDisconnected(ConnectionEventArgs<LogMessage> args)
    {
        _logger.LogInformation($"Server {args.Connection} disconnected");
    }

    private void OnServerConnected(ConnectionEventArgs<LogMessage> args)
    {
        _logger.LogInformation($"Server {args.Connection} connected");
    }

    private void OnMessageReceived(LogMessage message)
    {
        _logger.LogInformation($"Text from server: {message.MessageContent}");
    }

    private void OnExceptionOccurred(Exception exception)
    {
        _logger.LogInformation($"Exception occured in pipe: {exception}");
    }

    public void Dispose()
    {
        if (_client != null)
            _client.DisposeAsync().GetAwaiter().GetResult();
    }
}

public class VerlaPipesLogServer : IDisposable
{
    private string _pipeName = "";
    private readonly ILogger<VerlaPipesLogServer> _logger;
    private PipeServer<LogMessage> _server;

    // Define a delegate for handling received messages
    public delegate void MessageReceivedHandler(LogMessage message);
    // An event that clients can subscribe to
    public event MessageReceivedHandler? OnMessageReceivedEvent;

    public VerlaPipesLogServer(ILogger<VerlaPipesLogServer> logger, IFormatter formatter, string pipeName)
    {
        this._logger = logger;

        if (string.IsNullOrWhiteSpace(pipeName))
            throw new ArgumentNullException(nameof(pipeName));

        _pipeName = pipeName.Trim();
        _server = new PipeServer<LogMessage>(_pipeName, formatter: formatter);
        _server.ClientConnected += async (o, args) => OnClientConnected(args);
        _server.ClientDisconnected += (o, args) => OnClientDisconnected(args);
        _server.MessageReceived += (sender, args) => OnMessageReceived(args.Message);
        _server.ExceptionOccurred += (o, args) => OnExceptionOccurred(args.Exception);

        _server.StartAsync();
    }

    private async void OnClientConnected(ConnectionEventArgs<LogMessage> args)
    {
        _logger.LogInformation($"Connected to client");
    }

    private void OnClientDisconnected(ConnectionEventArgs<LogMessage> args)
    {
        _logger.LogInformation($"Client disconnected");
    }

    private void OnMessageReceived(LogMessage message)
    {
        // Invoke the event, passing the message to any subscribed handlers
        OnMessageReceivedEvent?.Invoke(message);
    }

    private void OnExceptionOccurred(Exception ex)
    {
        _logger.LogError($"Exception occured in pipe: {ex}");
    }

    public void Dispose()
    {
        DisposeAsync().GetAwaiter().GetResult();
    }

    public async Task DisposeAsync()
    {
        if (_server != null)
            await _server.DisposeAsync();
    }
}

In our logger, we process the log, send it to H.Pipes, and return the log string for further use:

private string ProcessLogMessage(LogMessage logMessage)
{

    // construct logstring
    string content = $"[{logMessage.ServiceId}] - {logMessage.MessageTitle} - {logMessage.MessageContent}";

    try
    {
        //Write logMessage to the logClient pipeline
        _logClient.Client.WriteAsync(logMessage);
    }
    catch (Exception ex)
    {
        Warn("LogDisplay endpoint not found", ex);
    }

    return content;
}

To investigate this issue, we created a test program that logs about 3000 lines every second. We confirmed that these logs arrive on the server side. However, the memory usage of the program keeps increasing, even when we reduce the workload.

Here’s a snapshot of the memory usage with the WriteAsync method: image

When we remove the line _logClient.Client.WriteAsync(logMessage); in our logger, the memory usage looks like this: image

We add the Client to service collection like this:

var pipeName = configuration.GetRequiredSection(options.LoggingConfigurationSectionName).GetRequiredSection("PipeName").Value;
var applicationName = configuration.GetRequiredSection(options.LoggingConfigurationSectionName).GetRequiredSection("ApplicationName").Value;

services.AddSingleton(serviceProvider =>
{
    var logger = serviceProvider.GetRequiredService<ILogger<VerlaPipesLogClient>>();
    return new VerlaPipesLogClient(logger, pipeName, applicationName);
});

No other changes were made. This suggests that the WriteAsync method in the H.Pipes client is causing the memory leak. We would appreciate any insights or suggestions on how to resolve this issue.

I hope this helps! If you need further assistance, feel free to ask.

Steps to reproduce the bug

  1. Open Client connection
  2. Open Server connection
  3. Keep writing things from client to server without closing the connection.

example:

public void Process()
{
    for(int i = 0; i < 5000; i++)
        _logger.Log("test", "this is a test log");
}

Expected behavior

Memory will keep rising

Screenshots

No response

NuGet package version

2.0.59

Platform

No response

IDE

Visual Studio 2022

Additional context

No response

HavenDV commented 2 months ago

What is formatter do you use?

mathiaspoedtsaxians commented 2 months ago

I have tested with both a CustomMessagePackFormatter, H.Pipes default MessagePackFormatter and not setting a formatter at all.

changing the LogMessage object each time. The CustomMessagePack was created because I was trying to send an Exception to the logging display and this did not work out of the box.

the 2 other formatters were without the Exception property

Message object

[MessagePackObject]
public class LogMessage
{
    [Key(0)]
    public string ServiceId { get; set; } = "";
    [Key(1)]
    public string MessageTitle { get; set; } = "";
    [Key(2)]
    public string MessageContent { get; set; } = "";
    [Key(3)]
    public Exception? Exception { get; set; }
    [Key(4)]
    public DateTime Timestamp { get; set; } = DateTime.Now;
    [Key(5)]
    public int LogLevel { get; set; } = 1; // Debug
}

Custom formatter

using H.Formatters;
using MessagePack;
using MessagePack.Formatters;
using MessagePack.Resolvers;

namespace VerlaLoggingModule.Formatters
{
    internal class CustomMessagePackFormatter : FormatterBase
    {
        private readonly MessagePackSerializerOptions _options;
        public CustomMessagePackFormatter(IMessagePackFormatter<Exception> exceptionFormatter, IMessagePackFormatter<LogMessage> logMessageFormatter)
        {
            var resolver = CompositeResolver.Create(
                new IMessagePackFormatter[] { exceptionFormatter, logMessageFormatter },
                new IFormatterResolver[] { StandardResolver.Instance }
                );

            _options = MessagePackSerializerOptions.Standard.WithResolver(resolver);
        }
        protected override byte[] SerializeInternal(object obj)
        {
            return MessagePackSerializer.Serialize(obj, _options);
        }

        protected override T DeserializeInternal<T>(byte[] bytes)
        {
            return MessagePackSerializer.Deserialize<T>(bytes, _options);
        }
    }
}

2 Custom MessagePack formatters

using MessagePack;
using MessagePack.Formatters;

namespace VerlaLoggingModule.Formatters
{
    internal class LogMessageFormatter : IMessagePackFormatter<LogMessage>
    {
        private readonly IMessagePackFormatter<Exception> _exceptionFormatter;
        public LogMessageFormatter(IMessagePackFormatter<Exception> exceptionFormatter)
        {
            _exceptionFormatter = exceptionFormatter;
        }

        public LogMessage Deserialize(ref MessagePackReader reader, MessagePackSerializerOptions options)
        {
            if (reader.TryReadNil())
            {
                return null;
            }

            var length = reader.ReadMapHeader();
            var logMessage = new LogMessage();

            for (int i = 0; i < length; i++)
            {
                switch (reader.ReadString())
                {
                    case "ServiceId":
                        logMessage.ServiceId = reader.ReadString();
                        break;
                    case "MessageTitle":
                        logMessage.MessageTitle = reader.ReadString();
                        break;
                    case "MessageContent":
                        logMessage.MessageContent = reader.ReadString();
                        break;
                    case "Exception":
                        logMessage.Exception = _exceptionFormatter.Deserialize(ref reader, options);
                        break;
                    case "Timestamp":
                        logMessage.Timestamp = DateTimeOffset.FromUnixTimeSeconds(reader.ReadInt64()).DateTime.ToLocalTime();
                        break;
                    case "LogLevel":
                        logMessage.LogLevel = reader.ReadInt32();
                        break;
                    default:
                        reader.Skip();
                        break;
                }
            }

            return logMessage;
        }

        public void Serialize(ref MessagePackWriter writer, LogMessage value, MessagePackSerializerOptions options)
        {
            writer.WriteMapHeader(6);

            writer.Write("ServiceId");
            writer.Write(value.ServiceId);

            writer.Write("MessageTitle");
            writer.Write(value.MessageTitle);

            writer.Write("MessageContent");
            writer.Write(value.MessageContent);

            writer.Write("Exception");
            _exceptionFormatter.Serialize(ref writer, value.Exception, options);

            writer.Write("Timestamp");
            writer.Write(new DateTimeOffset(value.Timestamp.ToUniversalTime()).ToUnixTimeSeconds());

            writer.Write("LogLevel");
            writer.Write(value.LogLevel);
        }
    }
}

for the exception entity:

using MessagePack;
using MessagePack.Formatters;

namespace VerlaLoggingModule.Formatters
{
    internal class ExceptionFormatter : IMessagePackFormatter<Exception>
    {
        public Exception Deserialize(ref MessagePackReader reader, MessagePackSerializerOptions options)
        {
            if (reader.TryReadNil())
            {
                return null;
            }

            var length = reader.ReadMapHeader();
            string message = null;
            string? stackTrace = null;
            Exception innerException = null;

            for (int i = 0; i < length; i++)
            {
                switch (reader.ReadString())
                {
                    case "Message":
                        message = reader.ReadString();
                        break;
                    case "StackTrace":
                        stackTrace = reader.ReadString();
                        break;
                    case "InnerException":
                        innerException = Deserialize(ref reader, options); // Recursive call
                        break;
                    default:
                        reader.Skip();
                        break;
                }
            }

            return new SerializableException(message) { StackTrace = stackTrace, InnerException = innerException };
        }

        public void Serialize(ref MessagePackWriter writer, Exception value, MessagePackSerializerOptions options)
        {
            if (value == null)
            {
                writer.WriteNil();
                return;
            }
            writer.WriteMapHeader(3);

            writer.Write("Message");
            writer.Write(value.Message);

            writer.Write("StackTrace");
            writer.Write(value.StackTrace);

            writer.Write("InnerException");
            Serialize(ref writer, value.InnerException, options);
        }
    }
}
mathiaspoedtsaxians commented 2 months ago

It's not you, its us.

I'm not sure why but the fact that we were using the _client.WriteAsync as a sync method might have been the cause. My best guess it that the unmanaged Task objects that H.Pipes returned never cleaned up even if they were completed.

I solved the issue by instead placing our logs in a ConcurrentQueue and handle the sending to H.Pipes on a separate tread.

public void Write(LogMessage logMessage)
{
    _logQueue.Enqueue(logMessage);
}

private async Task ProcessQueue(CancellationToken cancellationToken)
{
    while (!cancellationToken.IsCancellationRequested)
    {
        if (_logQueue.TryDequeue(out var logMessage))
        {
            // Process logMessage
            await _client.WriteAsync(logMessage, cancellationToken);
        }
        else
        {
            // No message to process, wait a bit
            await Task.Delay(TimeSpan.FromMilliseconds(500), cancellationToken);
        }
    }
}

I added a task in the constructor to slowly handle the build up queue on a separate thread. _processingTask = Task.Run(() => ProcessQueue(_cancellationTokenSource.Token));

You see the memory rising when we're pumping the queue with logs, once we stop adding to the queue. The memory doesn't grow while sending the build up logs through H.pipes in an async way. image

The queue won't release its memory either but it should only grow to a certain size. It's not like we're going to be spamming 3000 log messages a sec in production.

Apologies for bottering you. Thank you for creating this awesome package.

Kind regards, Mathias Poedts