.NET 4.5 Асинхронная утечка памяти TCP-сервера — получение / начало отправки

#sockets #asynchronous #tcp

#сокеты #асинхронный #tcp

Вопрос:

Нам нужна была служба Windows, которая поддерживала TCP-связь с несколькими клиентами. Итак, я основал это на примере MSDN Async. Дело в примере Microsoft заключается в том, что клиент отправляет одно сообщение на сервер, затем сервер повторно отправляет сообщение, а затем закрывается. Отлично!

Итак, слепо развернув это на нашем сайте продукта и клиента, мы получили сообщения о том, что он разбился. Глядя на Prod, мы заметили, что через 1 день использование памяти выросло до чуть менее 1 ГБ, прежде чем вызвать исключение OutOfMemoryException. Здесь много тестов!

Это произошло при подключении 1 клиента. Он отправляет сообщение на основе XML, которое имеет довольно большой размер ~ 1200 байт каждую секунду. Да, каждую секунду.

Затем служба выполняет некоторую обработку и отправляет клиенту возвращаемое XML-сообщение.

Я перевел связь TCP-клиент / сервер в простой набор консольных приложений, чтобы воспроизвести проблему — в основном для устранения других управляемых / неуправляемых ресурсов. Теперь я смотрю на это уже несколько дней и вырвал все свои волосы и зубы.

В моем примере я сосредоточен на следующих классах:

B2BSocketManager (прослушиватель сервера, отправитель, получатель)

Обратите внимание, что я изменил код, чтобы возвращать массив байтов whoopsy только для чтения, а не отправленное сообщение. Я также удалил новый AsyncCallback (делегат) из вызовов BeginReceive / BeginSend.

 namespace Acme.OPC.Service.Net.Sockets
{
    using Acme.OPC.Service.Logging;
    using System;
    using System.Linq;
    using System.Net;
    using System.Net.Sockets;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;

    public class B2BSocketManager : ISocketSender
    {
        private ManualResetEvent allDone = new ManualResetEvent(false);
        private IPEndPoint _localEndPoint;
        private readonly byte[] whoopsy = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };

        public B2BSocketManager(IPAddress address, int port)
        {
            _localEndPoint = new IPEndPoint(address, port);
        }

        public void StartListening()
        {
            StartListeningAsync();
        }

        private async Task StartListeningAsync()
        {
            await System.Threading.Tasks.Task.Factory.StartNew(() => ListenForConnections());
        }

        public void ListenForConnections()
        {
            Socket listener = new Socket(_localEndPoint.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
            Log.Instance.Info("B2BSocketManager Listening on "   _localEndPoint.Address.ToString()   ":"   _localEndPoint.Port.ToString());

            try
            {
                listener.Bind(_localEndPoint);
                listener.Listen(100);

                while (true)
                {
                    allDone.Reset();

                    Log.Instance.Info("B2BSocketManager Waiting for a connection...");
                    listener.BeginAccept(new AsyncCallback(ConnectCallback), listener);
                    allDone.WaitOne();
                }
            }
            catch (Exception e)
            {
                Log.Instance.Info(e.ToString());
            }
        }

        public void ConnectCallback(IAsyncResult ar)
        {
            allDone.Set();

            Socket listener = (Socket)ar.AsyncState;
            Socket handler = listener.EndAccept(ar);
            handler.DontFragment = false;
            handler.ReceiveBufferSize = ClientSocket.BufferSize;

            Log.Instance.Info("B2BSocketManager Client has connected on "   handler.RemoteEndPoint.ToString());

            ClientSocket state = new ClientSocket();
            state.workSocket = handler;

            handler.BeginReceive(state.buffer, 0, ClientSocket.BufferSize, 0, new AsyncCallback(ReadCallback), state); // SocketFlags.None
        }

        public void ReadCallback(IAsyncResult ar)
        {
            String message = String.Empty;

            ClientSocket state = (ClientSocket)ar.AsyncState;
            Socket handler = state.workSocket;

            int bytesRead = handler.EndReceive(ar);
            if (bytesRead > 0)
            {
                Console.WriteLine("Received "   bytesRead   " at "   DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"));

                message = Encoding.ASCII.GetString(state.buffer, 0, bytesRead);

                if (!string.IsNullOrEmpty(message))
                {
                    Send(handler, message);
                }

                handler.BeginReceive(state.buffer, 0, ClientSocket.BufferSize, 0, ReadCallback, state);
            }
        }

        public void Send(Socket socket, string data)
        {
            // just hard coding the whoopse readonly byte array
            socket.BeginSend(whoopsy, 0, whoopsy.Length, 0, SendCallback, socket);
        }

        private void SendCallback(IAsyncResult ar)
        {
            Socket state = (Socket)ar.AsyncState;

            try
            {
                int bytesSent = state.EndSend(ar);
            }
            catch (Exception e)
            {
                Log.Instance.ErrorException("", e);
            }
        }
    }
}
  

ClientSender (отправитель клиента)

Клиент отправляет XML-строку на сервер каждые 250 миллисекунд. Я хотел посмотреть, как это будет работать. XML немного меньше, чем то, что мы отправляем в нашей живой системе, и просто создается с использованием форматированной строки.

 namespace TestHarness
{
    using System;
    using System.Linq;
    using System.Net;
    using System.Net.Sockets;
    using System.Text;
    using System.Threading;

    class ClientSender
    {
        private static ManualResetEvent connectDone = new ManualResetEvent(false);
        private static ManualResetEvent receiveDone = new ManualResetEvent(false);
        private static ManualResetEvent sendDone = new ManualResetEvent(false);

        private static void StartSpamming(Socket client)
        {
            while(true)
            {
                string message = @"<request type=""da"">{0}{1}</request>"   Environment.NewLine;

                Send(client, string.Format(message, "Be someone"   DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), String.Concat(Enumerable.Repeat("<test>Oooooooops</test>", 50))));

                Thread.Sleep(250);
            }
        }

        public static void Connect(EndPoint remoteEP)
        {
            Socket listener = new Socket(remoteEP.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
            listener.BeginConnect(remoteEP, new AsyncCallback(ConnectCallback), listener);

            connectDone.WaitOne();
        }

        private static void ConnectCallback(IAsyncResult ar)
        {
            try
            {
                // Retrieve the socket from the state object.
                Socket client = (Socket)ar.AsyncState;

                // Complete the connection.
                client.EndConnect(ar);

                Console.WriteLine("Socket connected to {0}", client.RemoteEndPoint.ToString());

                // Signal that the connection has been made.
                connectDone.Set();

                System.Threading.Tasks.Task.Factory.StartNew(() => StartSpamming(client));
            }
            catch (Exception e)
            {
                Console.WriteLine(e.ToString());
            }
        }

        private static void Send(Socket client, String data)
        {
            byte[] byteData = Encoding.ASCII.GetBytes(data);
            client.BeginSend(byteData, 0, byteData.Length, SocketFlags.None, new AsyncCallback(SendCallback), client);
        }

        private static void SendCallback(IAsyncResult ar)
        {
            try
            {
                Socket client = (Socket)ar.AsyncState;
                int bytesSent = client.EndSend(ar);
                Console.WriteLine("Sent {0} bytes to server "   DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), bytesSent);
                sendDone.Set();
            }
            catch (Exception e)
            {
                Console.WriteLine(e.ToString());
            }
        }

        private static void Receive(Socket client)
        {
            try
            {
                StateObject state = new StateObject();
                state.workSocket = client;

                client.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReceiveCallback), state);
            }
            catch (Exception e)
            {
                Console.WriteLine(e.ToString());
            }
        }

        private static void ReceiveCallback(IAsyncResult ar)
        {
            try
            {
                StateObject state = (StateObject)ar.AsyncState;
                Socket client = state.workSocket;
                int bytesRead = client.EndReceive(ar);
                if (bytesRead > 0)
                {
                    state.sb.Append(Encoding.ASCII.GetString(state.buffer, 0, bytesRead));
                    client.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReceiveCallback), state);
                }
                else
                {
                    if (state.sb.Length > 1)
                    {
                        string response = state.sb.ToString();
                    }
                    receiveDone.Set();
                }
            }
            catch (Exception e)
            {
                Console.WriteLine(e.ToString());
            }
        }
    }
}
  

Класс состояния

Все, что я хотел, это буфер чтения, чтобы удалить сообщение и попытаться загрузить в XML. Но это было удалено из этой сокращенной версии, чтобы увидеть проблемы только с сокетами.

 using System;
using System.Linq;
using System.Net.Sockets;

namespace Acme.OPC.Service.Net.Sockets
{
    public class ClientSocket
    {
        public Socket workSocket = null;
        public const int BufferSize = 4096;
        public readonly byte[] buffer = new byte[BufferSize];
    }
}
  

Я поделился своим кодом здесь:

Исследуйте один общий ресурс диска

Я профилировал вещи с помощью моего профилировщика Telerik JustTrace. Я просто запускаю серверное приложение, а затем запускаю клиентское приложение. Это в моей 64-разрядной среде разработки Windows 7 VS2013.

Выполнить 1

Я вижу, что объем используемой памяти составляет около 250 КБ, а рабочий набор составляет около 20 МБ. Время, кажется, идет хорошо, затем внезапно использование памяти увеличится примерно через 12 минут. Хотя все меняется.

Профилировщик

Также может показаться, что после ~ 16: 45: 55 (моментальный снимок), когда я запускаю GC, объем памяти начинает увеличиваться каждый раз, когда я нажимаю на нее, вместо того, чтобы оставлять ее запущенной и автоматически увеличиваться, что может быть проблемой с Telerik.

Выполнить 2

Затем, если я создаю массив байтов внутри отправки с помощью (что больше того, что делает служба — отправляет клиенту соответствующую строку ответа):

 public void Send(Socket socket, string data)
{
    byte[] byteData = Encoding.ASCII.GetBytes(data);
    socket.BeginSend(byteData, 0, byteData.Length, 0, SendCallback, socket);
}
  

Мы видим, что объем памяти увеличивается:

Выполнить 2

Что подводит меня к тому, что сохраняется в памяти. Я вижу журнал системы.Многопоточность.OverlappedData и я заметили там ExecutionContexts . На этот раз OverlappedData содержит ссылку на массив байтов.

Запустите 2 крупнейших хранилища памяти

С корневыми путями к GC

Корневые пути к GC

Я запускаю профилирование на ночь, поэтому, надеюсь, смогу добавить больше информации к этому утром. Надеюсь, кто-нибудь сможет указать мне правильное направление до этого — если я делаю что-то не так, и я слишком слеп / глуп, чтобы это увидеть.

И вот результаты работы в одночасье: Запуск в одночасье

Комментарии:

1. Вы не закрываете сокет «обработчика», когда соединение завершено. Вероятно, это утечка памяти.; В общих чертах: я могу сказать, что у вас нет четкого понимания того, как и когда использовать асинхронный ввод-вывод и ожидание. Я мог бы указать на несколько проблем, но они не имеют значения для этого вопроса. Рассмотрите возможность использования полностью синхронного ввода-вывода для всего. Этой утечки не произошло бы, потому что вы просто поместили сокет в блок using . Все эти обратные вызовы делают код непонятным. Размер кода может составлять 1/2 или 1/3 от того, что у вас есть сейчас.

2. Привет, что вы имеете в виду, когда соединение завершено? Сценарий заключается в том, что клиент каждую секунду отправляет данные на сервер. Серверный процесс выполняет некоторое «чтение / запись» из внешней системы и в идеале должен быть настроен для доступа в режиме реального времени. Также может быть несколько клиентов, настолько синхронных, что я чувствовал, что это не совсем соответствует сценарию.

3. Ваш тестовый код сбивает с толку. Ни на клиенте, ни на сервере вы никогда не закрываете соединение. Это правильно? Разве это, конечно, не утечка памяти?; Синхронный код может работать с несколькими клиентами без проблем. Сколько клиентов будет? 100? Перейти на синхронный режим.

4. Если вы создаете один буфер для каждого подключенного клиента и подключаете все больше и больше клиентов, вы используете все больше и больше памяти. Мне действительно непонятно, почему это неожиданно. При использовании выделенных потоков вы будете использовать еще больше памяти, потому что каждый поток потребляет 1 МБ стековой памяти. Итак, можете ли вы сказать мне, почему вы ожидаете, что использование памяти останется постоянным, когда вы подключаете все больше и больше клиентов?

5. Модель с одним потоком на клиента довольно проста для понимания. Программирование сокетов очень сложно. Сосредоточьтесь на том, чтобы правильно использовать однопоточный подход. Похоже, вам никогда не понадобится обслуживать очень много клиентов, поэтому вам не нужна асинхронность. В частности, не используйте устаревший шаблон APM и вместо этого используйте async / await (если вам вообще нужна асинхронность). Это самый важный совет, который я могу вставить в комментарий.