#c# #sockets #asynchronous #delegates
#c# #сокеты #асинхронный #делегаты
Вопрос:
У меня есть куча асинхронных методов, которые я хочу предоставить через сокеты C #. Общий шаблон в документации MSDN имеет следующий вид:
public static void StartListening()
{
...
IPEndPoint localEndPoint = new IPEndPoint(ipAddress, 11000);
...
listener.BeginAccept(new AsyncCallback(AcceptCallback), listener);
...
}
public static void AcceptCallback(IAsyncResult ar)
{
...
handler.BeginReceive( state.buffer, 0, StateObject.BufferSize, 0,
new AsyncCallback(ReadCallback), state);
}
public static void ReadCallback(IAsyncResult ar)
{
...
StateObject state = (StateObject) ar.AsyncState;
...
CalculateResult(state);
...
handler.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0,
new AsyncCallback(ReadCallback), state);
...
}
Поэтому написать все это в красивой и понятной форме без повторения кода было непросто. Я думаю в этом направлении, но не смог соединить точки:
public static void StartListeningMaster()
{
string ipAddress = "localhost";
IPHostEntry ipHost = Dns.GetHostEntry(ipAddress);
IPAddress address = ipHost.AddressList[0];
StartListening(50000, address, AcceptCallback1);
StartListening(50001, address, AcceptCallback2);
StartListening(50002, address, AcceptCallback3);
...
}
public static void StartListening(int port, IPAddress ipAddress,
Action<IAsyncResult> acceptCallback) {...}
public static void AcceptCallback1(IAsyncResult ar)
{
...
handler.BeginReceive(state.buffer, 0, StateObject.BufferSize,
0, new AsyncCallback1(ReadCallback1), state);
}
...
До этого момента все работало нормально. Но для правильного рефакторинга я хотел бы иметь один метод AcceptCallback, который принимает в качестве своего параметра общий метод ReadCallback, который принимает в качестве своего параметра метод CalculateResult. Таким образом, у меня не было бы повторения кода. Однако, если я изменю свой метод AcceptCallback, чтобы принимать больше параметров, чем IAsyncResult (например, что-то вроде:
public static void StartListening(int port, IPAddress ipAddress, Action<IAsyncResult, Action<IAsyncResult>> acceptCallback) {...}
public static void AcceptCallback(IAsyncResult ar, Action<IAsyncResult> readCallback) {}
Я разрываю контракт делегирования AsyncCallback.
public delegate void AsyncCallback(IAsyncResult ar);
Затем я изучил возможность расширения существующих интерфейсов, чтобы обеспечить функциональность. Я изучил расширение
public interface IAsyncResult
Но это тоже не кажется правильным подходом. Итак, как мне написать этот код, чтобы я не копировал и не вставлял практически один и тот же код повсюду?
Комментарии:
1. Просто предупреждаю, вы никогда не вызываете
End...
методы. Например, вAcceptCallback
вы никогда не вызываетеlistener.EndAccept(ar)
. Или вы делаете, но вы этого не показываете?
Ответ №1:
Итак, я решаю эту проблему, перемещая базовые компоненты в их собственные абстрактные объекты. Затем отталкивайтесь от этих объектов. Например, серверу нужно только принимать / отслеживать соединения. Итак, я бы создал серверный объект, который выглядит примерно так:
namespace MultiServerExample.Base
{
public interface IAsyncServerBase
{
void StartListening();
bool IsListening { get; }
void StopListening();
void WriteDataToAllClients(byte[] data);
}
public abstract class AsyncServerBase<TClientBase> : IAsyncServerBase
where TClientBase : IAsyncClientBase, new()
{
// implement a TcpListener to gain access to Active property
private sealed class ActiveTcpListener : TcpListener
{
public ActiveTcpListener(IPAddress localaddr, int port)
: base(localaddr, port) { }
public bool IsActive => Active;
}
// our listener object
private ActiveTcpListener Listener { get; }
// our clients
private ConcurrentDictionary<string, TClientBase> Clients { get; }
// construct with a port
public AsyncServerBase(int port)
{
Clients = new ConcurrentDictionary<string, TClientBase>();
Listener = new ActiveTcpListener(IPAddress.Any, port);
}
// virtual methods for client action
public virtual void OnClientConnected(TClientBase client) { }
public virtual void OnClientDisconnected(TClientBase client, Exception ex) { }
// start the server
public void StartListening()
{
if(!IsListening)
{
Listener.Start();
Listener.BeginAcceptTcpClient(OnAcceptedTcpClient, this);
}
}
// check if the server is running
public bool IsListening =>
Listener.IsActive;
// stop the server
public void StopListening()
{
if (IsListening)
{
Listener.Stop();
Parallel.ForEach(Clients, x => x.Value.DetachClient(null));
Clients.Clear();
}
}
// async callback for when a client wants to connect
private static void OnAcceptedTcpClient(IAsyncResult res)
{
var me = (AsyncServerBase<TClientBase>)res.AsyncState;
if (!me.IsListening) { return; }
try
{
TcpClient client = null;
try
{
client = me.Listener.EndAcceptTcpClient(res);
}
catch(Exception ex)
{
System.Diagnostics.Debug.WriteLine($"Warning: unable to accept client:n{ex}");
}
if(client != null)
{
// create a new client
var t = new TClientBase();
// set up error callbacks
t.Error = me.OnClientBaseError;
// notify client we have attached
t.AttachClient(client);
// track the client
me.Clients[t.Id] = t;
// notify we have a new connection
me.OnClientConnected(t);
}
}
finally
{
// if we are still listening, wait for another connection
if(me.IsListening)
{
me.Listener.BeginAcceptSocket(OnAcceptedTcpClient, me);
}
}
}
// Event callback from a client that an error has occurred
private void OnClientBaseError(object sender, AsyncClientBaseErrorEventArgs e)
{
var client = (TClientBase)sender;
client.Error -= OnClientBaseError;
OnClientDisconnected(client, e.Exception);
client.DetachClient(e.Exception);
Clients.TryRemove(client.Id, out _);
}
// utility method to write data to all clients connected
public void WriteDataToAllClients(byte[] data)
{
Parallel.ForEach(Clients, x => x.Value.WriteData(data));
}
}
}
На этом этапе учтены все основы запуска сервера. Теперь для клиента, который работает на сервере:
namespace MultiServerExample.Base
{
public interface IAsyncClientBase
{
event EventHandler<AsyncClientBaseErrorEventArgs> Error;
void AttachClient(TcpClient client);
void WriteData(byte[] data);
void DetachClient(Exception ex);
string Id { get; }
}
public abstract class AsyncClientBase : IAsyncClientBase
{
protected virtual int ReceiveBufferSize { get; } = 1024;
private TcpClient Client { get; set; }
private byte[] ReceiveBuffer { get; set; }
public event EventHandler<AsyncClientBaseErrorEventArgs> Error;
public string Id { get; }
public AsyncClientBase()
{
Id = Guid.NewGuid().ToString();
}
public void AttachClient(TcpClient client)
{
if(ReceiveBuffer != null) { throw new InvalidOperationException(); }
ReceiveBuffer = new byte[ReceiveBufferSize];
Client = client;
try
{
Client.GetStream().
BeginRead(ReceiveBuffer, 0, ReceiveBufferSize, OnDataReceived, this);
OnAttachedToServer();
}
catch (Exception ex)
{
Error?.Invoke(this,
new AsyncClientBaseErrorEventArgs(ex, "BeginRead"));
}
}
public void DetachClient(Exception ex)
{
try
{
Client.Close();
OnDetachedFromServer(ex);
}
catch { /* intentionally swallow */ }
Client = null;
ReceiveBuffer = null;
}
public virtual void OnDataReceived(byte[] buffer) { }
public virtual void OnAttachedToServer() { }
public virtual void OnDetachedFromServer(Exception ex) { }
public void WriteData(byte[] data)
{
try
{
Client.GetStream().BeginWrite(data, 0, data.Length, OnDataWrote, this);
}
catch(Exception ex)
{
Error?.Invoke(this, new AsyncClientBaseErrorEventArgs(ex, "BeginWrite"));
}
}
private static void OnDataReceived(IAsyncResult iar)
{
var me = (AsyncClientBase)iar.AsyncState;
if(me.Client == null) { return; }
try
{
var bytesRead = me.Client.GetStream().EndRead(iar);
var buf = new byte[bytesRead];
Array.Copy(me.ReceiveBuffer, buf, bytesRead);
me.OnDataReceived(buf);
}
catch (Exception ex)
{
me.Error?.Invoke(me, new AsyncClientBaseErrorEventArgs(ex, "EndRead"));
}
}
private static void OnDataWrote(IAsyncResult iar)
{
var me = (AsyncClientBase)iar.AsyncState;
try
{
me.Client.GetStream().EndWrite(iar);
}
catch(Exception ex)
{
me.Error?.Invoke(me,
new AsyncClientBaseErrorEventArgs(ex, "EndWrite"));
}
}
}
}
Теперь весь ваш базовый код написан. Вам не нужно менять это каким-либо образом. Вы просто реализуете свой собственный клиент и сервер, чтобы реагировать соответствующим образом. Например, вот базовая реализация сервера:
public class MyServer : AsyncServerBase<MyClient>
{
public MyServer(int port) : base(port)
{
}
public override void OnClientConnected(MyClient client)
{
Console.WriteLine($"* MyClient connected with Id: {client.Id}");
base.OnClientConnected(client);
}
public override void OnClientDisconnected(MyClient client, Exception ex)
{
Console.WriteLine($"***** MyClient disconnected with Id: {client.Id} ({ex.Message})");
base.OnClientDisconnected(client, ex);
}
}
И вот клиент, который сервер выше использует для связи:
public class MyClient : AsyncClientBase
{
public override void OnAttachedToServer()
{
base.OnAttachedToServer();
Console.WriteLine($"{Id}: {GetType().Name} attached. Waiting for data...");
}
public override void OnDataReceived(byte[] buffer)
{
base.OnDataReceived(buffer);
Console.WriteLine($"{Id}: {GetType().Name} recieved {buffer.Length} bytes. Writing 5 bytes back.");
WriteData(new byte[] { 1, 2, 3, 4, 5 });
}
public override void OnDetachedFromServer(Exception ex)
{
base.OnDetachedFromServer(ex);
Console.WriteLine($"{Id}: {GetType().Name} detached.");
}
}
И чтобы довести дело до конца, вот еще один клиент, который просто подключается к той же серверной реализации, но придает ей другие характеристики:
public class MyOtherClient : AsyncClientBase
{
public override void OnAttachedToServer()
{
base.OnAttachedToServer();
Console.WriteLine($"{Id}: {GetType().Name} attached. Writing 4 bytes back.");
WriteData(new byte[] { 1, 2, 3, 4 });
}
public override void OnDataReceived(byte[] buffer)
{
base.OnDataReceived(buffer);
Console.WriteLine($"{Id}: {GetType().Name} recieved {buffer.Length} bytes.");
}
public override void OnDetachedFromServer(Exception ex)
{
base.OnDetachedFromServer(ex);
Console.WriteLine($"{Id}: {GetType().Name} detached.");
}
}
Что касается использования этого, вот небольшая тестовая программа, которая подвергает его стресс-тестированию:
class Program
{
static void Main(string[] args)
{
var servers = new IAsyncServerBase[]
{
new MyServer(50000),
new MyServer(50001),
new MyOtherServer(50002)
};
foreach (var s in servers)
{
s.StartListening();
}
RunTestUsingMyServer("1", 89, 50000);
RunTestUsingMyServer("2", 127, 50001);
RunTestUsingMyOtherServer("3", 88, 50002);
Console.Write("Press any key to exit... ");
Console.ReadKey(true);
foreach (var s in servers)
{
s.WriteDataToAllClients(new byte[] { 1, 2, 3, 4, 5 });
s.StopListening();
}
}
private static void RunTestUsingMyServer(string name, int clientCount, int port)
{
Parallel.For(0, clientCount, x =>
{
using (var t = new TcpClient())
{
t.Connect(IPAddress.Loopback, port);
t.GetStream().Write(new byte[] { 1, 2, 3, 4, 5 }, 0, 5);
t.GetStream().Read(new byte[512], 0, 512);
t.Close();
}
Console.WriteLine($"FINISHED PASS {name} #{x}");
});
}
private static void RunTestUsingMyOtherServer(string name, int clientCount, int port)
{
Parallel.For(0, clientCount, x =>
{
using (var t = new TcpClient())
{
t.Connect(IPAddress.Loopback, port);
t.GetStream().Read(new byte[512], 0, 512);
t.GetStream().Write(new byte[] { 1, 2, 3, 4, 5, 6 }, 0, 6);
t.Close();
}
Console.WriteLine($"FINISHED PASS {name} #{x}");
});
}
}
Если вам интересно, вот полный исходный код, который вы можете посмотреть. Надеюсь, это приведет вас туда, где вы хотите быть, поскольку это относится к повторному использованию кода.
Ответ №2:
Я не знаю, может ли это помочь. Вы можете определить объект состояния со всей информацией, относящейся к каждому порту:
public class StateObject
{
public string Name;
public Socket Listener;
public IPEndPoint LocalEndPoint;
//...
public StateObject(Socket listener, IPEndPoint endPoint, string name)
{
Listener = listener;
LocalEndPoint = endPoint;
Name = name;
}
}
Затем вы можете использовать его по мере необходимости:
public static void StartListeningMaster()
{
string ipAddress = "localhost";
IPHostEntry ipHost = Dns.GetHostEntry(ipAddress);
IPAddress address = ipHost.AddressList[0];
StartListening(50000, address, "Main port");
StartListening(50001, address, "Service port");
StartListening(50002, address, "Extra");
//...
}
public static void StartListening(int port, IPAddress ipAddress, string name = "")
{
//...
IPEndPoint localEndPoint = new IPEndPoint(ipAddress, port);
//...
StateObject state = new StateObject(listener, localEndPoint);
listener.BeginAccept(AcceptCallback, state);
//...
}
public static void AcceptCallback(IAsyncResult ar)
{
StateObject state = (StateObject)ar.AsyncState;
//...
handler.BeginReceive(client.buffer, 0, StateObject.BufferSize,
0, new AsyncCallback(ReadCallback), state);
}
public static void ReadCallback(IAsyncResult ar)
{
StateObject state = (StateObject)ar.AsyncState;
// Always have the info related to every socket
Socket listener = state.Listener;
string address = state.LocalEndPoint.Address.ToString();
int port = state.LocalEndPoint.Port;
string name = state.Name;
//...
StateObject state = (StateObject)ar.AsyncState;
//...
CalculateResult(state);
//...
handler.BeginReceive(client.buffer, 0, StateObject.BufferSize, 0,
new AsyncCallback(ReadCallback), state);
//...
}
CalculateResult(state)
Метод будет иметь всю необходимую информацию для выполнения любых действий. Таким образом, у вас есть только один StartListening()
, один AcceptCallback()
и один ReadCallback()
для управления всеми портами.