Неблокирующий TCP-сервер, использующий принципы OTP

#tcp #erlang #erlang-otp

#tcp #erlang #erlang-otp

Вопрос:

Я начинаю изучать Erlang, поэтому я пытаюсь написать «привет, мир!» для параллельного программирования, IRC-бота.

Я уже написал один с использованием Erlang без каких-либо тонкостей OTP (поведения супервизора, приложения и т.д.). Я хочу переписать его, используя принципы OTP, но, к сожалению, я не могу найти «правильный» способ программирования сокетов с помощью OTP.

Кажется, единственный разумный способ — создать другой процесс вручную и связать его с супервизором, но наверняка кто-то, где-то, делал это раньше.

Ответ №1:

Я думаю, это то, что вы ищете: http://www.trapexit.org/Building_a_Non-blocking_TCP_server_using_OTP_principles Это полное руководство о том, как создать неблокирующий TCP-сервер с использованием OTP (конечно, полностью документировано и объяснено).

Комментарии:

1. Нет, использует недокументированный (и потенциально нестабильный) prim_inet:async_accept/2. Возможно, для этого не существует «OTP-способа» :/

2. В этом случае я бы просто использовал gen_tcp:accept/1 и gen_tcp:controlling_process /2 (как следует из документации: «Присваивает сокету новый Pid процесса управления. Процесс управления — это процесс, который получает сообщения из сокета. Если вызывается любым другим процессом, отличным от текущего процесса управления, возвращается {error, eperm}. «). Вот пример того, как это использовать: 20bits.com/articles/erlang-a-generalized-tcp-server (обратите внимание на следующий абзац: «Проблема с внедрением сетевого сервера с использованием gen_server заключается в том, что вызов gen_tcp:accept …». Надеюсь, это поможет.

Ответ №2:

Здорово, что вы начали изучать Erlang / OTP!

Следующие ресурсы очень полезны:

  • Принципы проектирования OTP. Внимательно прочитайте это, если вы еще этого не сделали. Обратите внимание на распространенное заблуждение, что OTP объектно-ориентирован (OO): это не так! Забудьте все о «наследовании». Невозможно просто создавать полные системы, «расширяя» стандартные модули.
  • Система обмена сообщениями:

    Эти функции должны использоваться для реализации использования системных сообщений для процесса

  • Специальные процессы. Специальный процесс — это процесс, совместимый с OTP, который может хорошо интегрироваться с супервизорами.

Это некоторый код, который у меня есть в моем проекте. Я тоже изучаю Erlang, поэтому, пожалуйста, не слишком доверяйте коду.

 -module(gen_tcpserver).

%% Public API
-export([start_link/2]).

%% start_link reference
-export([init/2]).

%% System internal API
-export([system_continue/3, system_terminate/4, system_code_change/4]).

-define(ACCEPT_TIMEOUT, 250).

-record(server_state, {socket=undefined,
                       args,
                       func}).

%% ListenArgs are given to gen_tcp:listen
%% AcceptFun(Socket) -> ok, blocks the TCP accept loop
start_link(ListenArgs, AcceptFun) ->
    State = #server_state{args=ListenArgs,func=AcceptFun},
    proc_lib:start_link(?MODULE, init, [self(), State]).

init(Parent, State) ->
    {Port, Options} = State#server_state.args,
    {ok, ListenSocket} = gen_tcp:listen(Port, Options),
    NewState = State#server_state{socket=ListenSocket},
    Debug = sys:debug_options([]),
    proc_lib:init_ack(Parent, {ok, self()}),
    loop(Parent, Debug, NewState).

loop(Parent, Debug, State) ->
    case gen_tcp:accept(State#server_state.socket, ?ACCEPT_TIMEOUT) of
        {ok, Socket} when Debug =:= [] -> ok = (State#server_state.func)(Socket);
        {ok, Socket} ->
            sys:handle_debug(Debug, fun print_event/3, undefined, {accepted, Socket}),
            ok = (State#server_state.func)(Socket);
        {error, timeout} -> ok;
        {error, closed} when Debug =:= [] ->
            sys:handle_debug(Debug, fun print_event/3, undefined, {closed}),
            exit(normal);
        {error, closed} -> exit(normal)
    end,
    flush(Parent, Debug, State).

flush(Parent, Debug, State) ->
    receive
        {system, From, Msg} ->
            sys:handle_system_msg(Msg, From, Parent, ?MODULE, Debug, State)
        after 0 ->
            loop(Parent, Debug, State)
    end.

print_event(Device, Event, _Extra) ->
    io:format(Device, "*DBG* TCP event = ~p~n", [Event]).

system_continue(Parent, Debug, State) ->
    loop(Parent, Debug, State).

system_terminate(Reason, _Parent, _Debug, State) ->
    gen_tcp:close(State#server_state.socket),
    exit(Reason).

system_code_change(State, _Module, _OldVsn, _Extra) ->
    {ok, State}.
  

Обратите внимание, что это совместимый процесс OTP (им может управлять супервизор). Вы должны использовать AcceptFun для создания (= быстрее) нового дочернего рабочего элемента. Однако я еще не тестировал его досконально.

 1> {ok, A} = gen_tcpserver:start_link({8080,[]},fun(Socket)->gen_tcp:close(Socket) end).
{ok,<0.93.0>}
2> sys:trace(A, true).
ok
*DBG* TCP event = {accepted,#Port<0.2102>}
*DBG* TCP event = {accepted,#Port<0.2103>}
3> 
  

(После 2> ‘s ok я указал моему браузеру Google Chrome на порт 8080: отличный тест для TCP!)

Комментарии:

1. Кстати, при использовании этого процесса OTP shutdown параметр для любого супервизора имеет смысл: он должен быть по крайней мере больше, чем ACCEPT_TIMEOUT .

Ответ №3:

Другим способом реализации асинхронного прослушивателя TCP является использование supervisor_bridge .

Вот некоторый код, который я написал, чтобы показать это (не тестировался):

 -module(connection_bridge).

-behaviour(supervisor_bridge).

% supervisor_bridge export
-export([init/1, terminate/2]).

% internal proc_lib:start_link
-export([accept_init/3]).

%% Port: see gen_tcp:listen(Port, _).
%% Options: see gen_tcp:listen(_, Options).
%% ConnectionHandler: Module:Function(Arguments)->pid() or fun/0->pid()
%% ConnectionHandler: return pid that will receive TCP messages
init({Port, Options, ConnectionHandler}) ->
    case gen_tcp:listen(Port, Options) of
        {ok, ListenSocket} ->
            {ok, ServerPid} = proc_lib:start_link(?MODULE, accept_init,
                [self(), ListenSocket, ConnectionHandler], 1000),
            {ok, ServerPid, ListenSocket};
        OtherResult -> OtherResult
    end.

terminate(_Reason, ListenSocket) ->
    gen_tcp:close(ListenSocket).

accept_init(ParentPid, ListenSocket, ConnectionHandler) ->
    proc_lib:init_ack(ParentPid, {ok, self()}),
    accept_loop(ListenSocket, ConnectionHandler).

accept_loop(ListenSocket, ConnectionHandler) ->
    case gen_tcp:accept(ListenSocket) of
        {ok, ClientSocket} ->
            Pid = case ConnectionHandler of
                {Module, Function, Arguments} ->
                    apply(Module, Function, Arguments);
                Function when is_function(Function, 0) ->
                    Function()
            end,
            ok = gen_tcp:controlling_process(ClientSocket, Pid),
            accept_loop(ListenSocket, ConnectionHandler);
        {error, closed} ->
            error({shutdown, tcp_closed});
        {error, Reason} ->
            error(Reason)
    end.
  

Намного проще для понимания, чем мой другой ответ. connection_bridge Также может быть расширен для поддержки UDP и SCTP.