рефакторинг программирования асинхронных сокетов на C#

#c# #sockets #asynchronous #delegates

#c# #сокеты #асинхронный #делегаты

Вопрос:

У меня есть куча асинхронных методов, которые я хочу предоставить через сокеты C #. Общий шаблон в документации MSDN имеет следующий вид:

  public static void StartListening()
 {
   ...
   IPEndPoint localEndPoint = new IPEndPoint(ipAddress, 11000);
   ...
   listener.BeginAccept(new AsyncCallback(AcceptCallback), listener); 
   ... 
 }

 public static void AcceptCallback(IAsyncResult ar)
 {
    ...
    handler.BeginReceive( state.buffer, 0, StateObject.BufferSize, 0,  
        new AsyncCallback(ReadCallback), state);  
 }

 public static void ReadCallback(IAsyncResult ar)
 { 
    ...
    StateObject state = (StateObject) ar.AsyncState;
    ...
    CalculateResult(state);  
    ...
    handler.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0,  
                         new AsyncCallback(ReadCallback), state);  
    ...
 }
  

Поэтому написать все это в красивой и понятной форме без повторения кода было непросто. Я думаю в этом направлении, но не смог соединить точки:

     public static void StartListeningMaster()
    {
        string ipAddress = "localhost";
        IPHostEntry ipHost = Dns.GetHostEntry(ipAddress);
        IPAddress address = ipHost.AddressList[0];

        StartListening(50000, address, AcceptCallback1);
        StartListening(50001, address, AcceptCallback2);
        StartListening(50002, address, AcceptCallback3);
        ...
    }
    
    public static void StartListening(int port, IPAddress ipAddress,  
                           Action<IAsyncResult> acceptCallback) {...}
    public static void AcceptCallback1(IAsyncResult ar)
    {
       ...
       handler.BeginReceive(state.buffer, 0, StateObject.BufferSize, 
                         0, new AsyncCallback1(ReadCallback1), state);  
    }
    ...
  

До этого момента все работало нормально. Но для правильного рефакторинга я хотел бы иметь один метод AcceptCallback, который принимает в качестве своего параметра общий метод ReadCallback, который принимает в качестве своего параметра метод CalculateResult. Таким образом, у меня не было бы повторения кода. Однако, если я изменю свой метод AcceptCallback, чтобы принимать больше параметров, чем IAsyncResult (например, что-то вроде:

   public static void StartListening(int port, IPAddress ipAddress, Action<IAsyncResult, Action<IAsyncResult>> acceptCallback) {...}

  public static void AcceptCallback(IAsyncResult ar, Action<IAsyncResult> readCallback) {}
 
  

Я разрываю контракт делегирования AsyncCallback.

 public delegate void AsyncCallback(IAsyncResult ar);
  

Затем я изучил возможность расширения существующих интерфейсов, чтобы обеспечить функциональность. Я изучил расширение

 public interface IAsyncResult
  

Но это тоже не кажется правильным подходом. Итак, как мне написать этот код, чтобы я не копировал и не вставлял практически один и тот же код повсюду?

Комментарии:

1. Просто предупреждаю, вы никогда не вызываете End... методы. Например, в AcceptCallback вы никогда не вызываете listener.EndAccept(ar) . Или вы делаете, но вы этого не показываете?

Ответ №1:

Итак, я решаю эту проблему, перемещая базовые компоненты в их собственные абстрактные объекты. Затем отталкивайтесь от этих объектов. Например, серверу нужно только принимать / отслеживать соединения. Итак, я бы создал серверный объект, который выглядит примерно так:

 namespace MultiServerExample.Base
{
    public interface IAsyncServerBase
    {
        void StartListening();
        bool IsListening { get; }
        void StopListening();
        void WriteDataToAllClients(byte[] data);
    }

    public abstract class AsyncServerBase<TClientBase> : IAsyncServerBase
        where TClientBase : IAsyncClientBase, new()
    {
        // implement a TcpListener to gain access to Active property
        private sealed class ActiveTcpListener : TcpListener
        {
            public ActiveTcpListener(IPAddress localaddr, int port)
                : base(localaddr, port) { }
            public bool IsActive => Active;
        }

        // our listener object
        private ActiveTcpListener Listener { get; }
        // our clients
        private ConcurrentDictionary<string, TClientBase> Clients { get; }

        // construct with a port
        public AsyncServerBase(int port)
        {
            Clients = new ConcurrentDictionary<string, TClientBase>();
            Listener = new ActiveTcpListener(IPAddress.Any, port);
        }

        // virtual methods for client action
        public virtual void OnClientConnected(TClientBase client) { }
        public virtual void OnClientDisconnected(TClientBase client, Exception ex) { }

        // start the server
        public void StartListening()
        {
            if(!IsListening)
            {
                Listener.Start();
                Listener.BeginAcceptTcpClient(OnAcceptedTcpClient, this);
            }
        }

        // check if the server is running
        public bool IsListening =>
            Listener.IsActive;

        // stop the server
        public void StopListening()
        {
            if (IsListening)
            {
                Listener.Stop();
                Parallel.ForEach(Clients, x => x.Value.DetachClient(null));
                Clients.Clear();
            }
        }

        // async callback for when a client wants to connect
        private static void OnAcceptedTcpClient(IAsyncResult res)
        {
            var me = (AsyncServerBase<TClientBase>)res.AsyncState;

            if (!me.IsListening) { return; }

            try
            {
                TcpClient client = null;
                try
                {
                    client = me.Listener.EndAcceptTcpClient(res);
                }
                catch(Exception ex)
                {
                    System.Diagnostics.Debug.WriteLine($"Warning: unable to accept client:n{ex}");
                }

                if(client != null)
                {
                    // create a new client
                    var t = new TClientBase();
                    // set up error callbacks
                    t.Error  = me.OnClientBaseError;
                    // notify client we have attached
                    t.AttachClient(client);
                    // track the client
                    me.Clients[t.Id] = t;
                    // notify we have a new connection
                    me.OnClientConnected(t);
                }
            }
            finally
            {
                // if we are still listening, wait for another connection
                if(me.IsListening)
                {
                    me.Listener.BeginAcceptSocket(OnAcceptedTcpClient, me);
                }
            }
        }

        // Event callback from a client that an error has occurred
        private void OnClientBaseError(object sender, AsyncClientBaseErrorEventArgs e)
        {
            var client = (TClientBase)sender;
            client.Error -= OnClientBaseError;

            OnClientDisconnected(client, e.Exception);

            client.DetachClient(e.Exception);
            Clients.TryRemove(client.Id, out _);
        }

        // utility method to write data to all clients connected
        public void WriteDataToAllClients(byte[] data)
        {
            Parallel.ForEach(Clients, x => x.Value.WriteData(data));
        }
    }
}
  

На этом этапе учтены все основы запуска сервера. Теперь для клиента, который работает на сервере:

 namespace MultiServerExample.Base
{
    public interface IAsyncClientBase
    {
        event EventHandler<AsyncClientBaseErrorEventArgs> Error;
        void AttachClient(TcpClient client);
        void WriteData(byte[] data);
        void DetachClient(Exception ex);
        string Id { get; }
    }

    public abstract class AsyncClientBase : IAsyncClientBase
    {
        protected virtual int ReceiveBufferSize { get; } = 1024;
        private TcpClient Client { get; set; }
        private byte[] ReceiveBuffer { get; set; }
        public event EventHandler<AsyncClientBaseErrorEventArgs> Error;
        public string Id { get; }

        public AsyncClientBase()
        {
            Id = Guid.NewGuid().ToString();
        }

        public void AttachClient(TcpClient client)
        {
            if(ReceiveBuffer != null) { throw new InvalidOperationException(); }

            ReceiveBuffer = new byte[ReceiveBufferSize];
            Client = client;

            try
            {
                Client.GetStream().
                    BeginRead(ReceiveBuffer, 0, ReceiveBufferSize, OnDataReceived, this);
                OnAttachedToServer();
            }
            catch (Exception ex)
            {
                Error?.Invoke(this,
                    new AsyncClientBaseErrorEventArgs(ex, "BeginRead"));
            }
        }

        public void DetachClient(Exception ex)
        {
            try
            {
                Client.Close();
                OnDetachedFromServer(ex);
            }
            catch { /* intentionally swallow */ }
        
            Client = null;
            ReceiveBuffer = null;
        }

        public virtual void OnDataReceived(byte[] buffer) { }
        public virtual void OnAttachedToServer() { }
        public virtual void OnDetachedFromServer(Exception ex) { }

        public void WriteData(byte[] data)
        {
            try
            {
                Client.GetStream().BeginWrite(data, 0, data.Length, OnDataWrote, this);
            }
            catch(Exception ex)
            {
                Error?.Invoke(this, new AsyncClientBaseErrorEventArgs(ex, "BeginWrite"));
            }
        }

        private static void OnDataReceived(IAsyncResult iar)
        {
            var me = (AsyncClientBase)iar.AsyncState;

            if(me.Client == null) { return; }

            try
            {
                var bytesRead = me.Client.GetStream().EndRead(iar);
                var buf = new byte[bytesRead];
                Array.Copy(me.ReceiveBuffer, buf, bytesRead);

                me.OnDataReceived(buf);
            }
            catch (Exception ex)
            {
                me.Error?.Invoke(me, new AsyncClientBaseErrorEventArgs(ex, "EndRead"));
            }
        }

        private static void OnDataWrote(IAsyncResult iar)
        {
            var me = (AsyncClientBase)iar.AsyncState;
            try
            {
                me.Client.GetStream().EndWrite(iar);
            }
            catch(Exception ex)
            {
                me.Error?.Invoke(me,
                    new AsyncClientBaseErrorEventArgs(ex, "EndWrite"));
            }
        }
    }
}
  

Теперь весь ваш базовый код написан. Вам не нужно менять это каким-либо образом. Вы просто реализуете свой собственный клиент и сервер, чтобы реагировать соответствующим образом. Например, вот базовая реализация сервера:

 public class MyServer : AsyncServerBase<MyClient>
{
    public MyServer(int port) : base(port)
    {
    }

    public override void OnClientConnected(MyClient client)
    {
        Console.WriteLine($"* MyClient connected with Id: {client.Id}");
        base.OnClientConnected(client);
    }

    public override void OnClientDisconnected(MyClient client, Exception ex)
    {
        Console.WriteLine($"***** MyClient disconnected with Id: {client.Id} ({ex.Message})");
        base.OnClientDisconnected(client, ex);
    }
}
  

И вот клиент, который сервер выше использует для связи:

 public class MyClient : AsyncClientBase
{
    public override void OnAttachedToServer()
    {
        base.OnAttachedToServer();

        Console.WriteLine($"{Id}: {GetType().Name} attached. Waiting for data...");
    }

    public override void OnDataReceived(byte[] buffer)
    {
        base.OnDataReceived(buffer);

        Console.WriteLine($"{Id}: {GetType().Name} recieved {buffer.Length} bytes. Writing 5 bytes back.");
        WriteData(new byte[] { 1, 2, 3, 4, 5 });
    }

    public override void OnDetachedFromServer(Exception ex)
    {
        base.OnDetachedFromServer(ex);

        Console.WriteLine($"{Id}: {GetType().Name} detached.");
    }
}
  

И чтобы довести дело до конца, вот еще один клиент, который просто подключается к той же серверной реализации, но придает ей другие характеристики:

 public class MyOtherClient : AsyncClientBase
{
    public override void OnAttachedToServer()
    {
        base.OnAttachedToServer();

        Console.WriteLine($"{Id}: {GetType().Name} attached. Writing 4 bytes back.");
        WriteData(new byte[] { 1, 2, 3, 4 });
    }

    public override void OnDataReceived(byte[] buffer)
    {
        base.OnDataReceived(buffer);

        Console.WriteLine($"{Id}: {GetType().Name} recieved {buffer.Length} bytes.");
    }

    public override void OnDetachedFromServer(Exception ex)
    {
        base.OnDetachedFromServer(ex);

        Console.WriteLine($"{Id}: {GetType().Name} detached.");
    }
}
  

Что касается использования этого, вот небольшая тестовая программа, которая подвергает его стресс-тестированию:

     class Program
{
    static void Main(string[] args)
    {
        var servers = new IAsyncServerBase[]
        {
            new MyServer(50000),
            new MyServer(50001),
            new MyOtherServer(50002)
        };

        foreach (var s in servers)
        {
            s.StartListening();
        }

        RunTestUsingMyServer("1", 89, 50000);
        RunTestUsingMyServer("2", 127, 50001);
        RunTestUsingMyOtherServer("3", 88, 50002);

        Console.Write("Press any key to exit... ");
        Console.ReadKey(true);

        foreach (var s in servers)
        {
            s.WriteDataToAllClients(new byte[] { 1, 2, 3, 4, 5 });
            s.StopListening();
        }
    }

    private static void RunTestUsingMyServer(string name, int clientCount, int port)
    {
        Parallel.For(0, clientCount, x =>
        {
            using (var t = new TcpClient())
            {
                t.Connect(IPAddress.Loopback, port);
                t.GetStream().Write(new byte[] { 1, 2, 3, 4, 5 }, 0, 5);
                t.GetStream().Read(new byte[512], 0, 512);
                t.Close();
            }
            Console.WriteLine($"FINISHED PASS {name} #{x}");
        });
    }

    private static void RunTestUsingMyOtherServer(string name, int clientCount, int port)
    {
        Parallel.For(0, clientCount, x =>
        {
            using (var t = new TcpClient())
            {
                t.Connect(IPAddress.Loopback, port);
                t.GetStream().Read(new byte[512], 0, 512);
                t.GetStream().Write(new byte[] { 1, 2, 3, 4, 5, 6 }, 0, 6);
                t.Close();
            }
            Console.WriteLine($"FINISHED PASS {name} #{x}");
        });
    }
}
  

Если вам интересно, вот полный исходный код, который вы можете посмотреть. Надеюсь, это приведет вас туда, где вы хотите быть, поскольку это относится к повторному использованию кода.

Ответ №2:

Я не знаю, может ли это помочь. Вы можете определить объект состояния со всей информацией, относящейся к каждому порту:

 public class StateObject
{
    public string Name;
    public Socket Listener;
    public IPEndPoint LocalEndPoint;
    //...

    public StateObject(Socket listener, IPEndPoint endPoint, string name)
    {
        Listener = listener;
        LocalEndPoint = endPoint;
        Name = name;
    }
}
  

Затем вы можете использовать его по мере необходимости:

     public static void StartListeningMaster()
    {
        string ipAddress = "localhost";
        IPHostEntry ipHost = Dns.GetHostEntry(ipAddress);
        IPAddress address = ipHost.AddressList[0];

        StartListening(50000, address, "Main port");
        StartListening(50001, address, "Service port");
        StartListening(50002, address, "Extra");
        //...
    }

    public static void StartListening(int port, IPAddress ipAddress, string name = "")
    {
        //...
        IPEndPoint localEndPoint = new IPEndPoint(ipAddress, port);
        //...
        StateObject state = new StateObject(listener, localEndPoint);
        listener.BeginAccept(AcceptCallback, state);
        //...
    }

    public static void AcceptCallback(IAsyncResult ar)
    {
        StateObject state = (StateObject)ar.AsyncState;
        //...
        handler.BeginReceive(client.buffer, 0, StateObject.BufferSize,
                     0, new AsyncCallback(ReadCallback), state);
    }

    public static void ReadCallback(IAsyncResult ar)
    {
        StateObject state = (StateObject)ar.AsyncState;

        // Always have the info related to every socket
        Socket listener = state.Listener;
        string address = state.LocalEndPoint.Address.ToString();
        int port = state.LocalEndPoint.Port;
        string name = state.Name;

        //...
        StateObject state = (StateObject)ar.AsyncState;
        //...
        CalculateResult(state);
        //...
        handler.BeginReceive(client.buffer, 0, StateObject.BufferSize, 0,
                             new AsyncCallback(ReadCallback), state);
        //...
    }
  

CalculateResult(state) Метод будет иметь всю необходимую информацию для выполнения любых действий. Таким образом, у вас есть только один StartListening() , один AcceptCallback() и один ReadCallback() для управления всеми портами.