Как оптимизировать цикл приема для тысяч сообщений в Erlang?

#erlang #parallel-processing

#erlang #параллельная обработка

Вопрос:

В главе «Программирование многоядерных процессоров» книги Programming Erlang Джо Армстронг приводит хороший пример распараллеливания функции map:

 pmap(F, L) ->
    S = self(),
    %% make_ref() returns a unique reference
    %% we'll match on this later
    Ref = erlang:make_ref(),
    Pids = map(fun(I) ->
        spawn(fun() -> do_f(S, Ref, F, I) end)
    end, L),
    %% gather the results
    gather(Pids, Ref).

do_f(Parent, Ref, F, I) ->
    Parent ! {self(), Ref, (catch F(I))}.

gather([Pid|T], Ref) ->
    receive
        {Pid, Ref, Ret} -> [Ret|gather(T, Ref)]
    end;

gather([], _) ->
    [].
  

Это работает хорошо, но я считаю, что в нем есть узкое место, из-за которого он работает очень медленно в списках с более чем 100 000 элементами.

Когда gather() функция выполняется, она начинает сопоставлять первое Pid из Pids списка с сообщением в почтовом ящике основного процесса. Но что, если самое старое сообщение в почтовом ящике не из этого самого Pid ? Затем он пробует все другие сообщения, пока не найдет совпадение. При этом существует определенная вероятность того, что при выполнении gather() функции нам придется перебирать все сообщения почтового ящика, чтобы найти совпадение с a Pid , которое мы взяли из Pids списка. Это N * N наихудший сценарий для списка размера N.

Мне даже удалось доказать существование этого узкого места:

 gather([Pid|T], Ref) ->
    receive
        {Pid, Ref, Ret} -> [Ret|gather(T, Ref)];
        %% Here it is:
        Other -> io:format("The oldest message in the mailbox (~w) did not match with Pid ~w~n", [Other,Pid])
    end;
  

Как я могу избежать этого узкого места?

Комментарии:

1. кажется очень простой проблемой, удивлен, что до сих пор нет хорошего ответа на этот вопрос.

Ответ №1:

Проблема в том, что если вы хотите иметь правильное решение, вам все равно придется:

  • проверьте, поступает ли данный ответ от одного из процессов, которые вы породили
  • обеспечить правильный порядок результатов

Вот решение, которое использует счетчики вместо списков — это устраняет необходимость многократного обхода входящих сообщений. Сопоставление Ref гарантирует, что сообщения, которые мы получаем, от наших дочерних элементов. Надлежащий порядок обеспечивается сортировкой результата с lists:keysort/2 помощью в самом конце pmap , что добавляет некоторые накладные расходы, но, вероятно, будет меньше O(n^2) .

 -module(test).

-compile(export_all).

pmap(F, L) ->
    S = self(),
    % make_ref() returns a unique reference
    % we'll match on this later
    Ref = erlang:make_ref(),
    Count = lists:foldl(fun(I, C) ->
                                spawn(fun() ->
                                              do_f(C, S, Ref, F, I)
                                      end),
                                C 1
                        end, 0, L),
    % gather the results
    Res = gather(0, Count, Ref),
    % reorder the results
    element(2, lists:unzip(lists:keysort(1, Res))).


do_f(C, Parent, Ref, F, I) ->
    Parent ! {C, Ref, (catch F(I))}.


gather(C, C, _) ->
    [];
gather(C, Count, Ref) ->
    receive
        {C, Ref, Ret} -> [{C, Ret}|gather(C 1, Count, Ref)]
    end.
  

Комментарии:

1. Он использует lists:foldl вместо map , который вы, возможно, еще не реализовали самостоятельно. Взгляните на его определение / реализацию в man lists или в книге (я полагаю, что это там).

Ответ №2:

Пример Джо хорош, но на практике вам нужно более тяжеловесное решение вашей проблемы. Взгляните на http://code.google.com/p/plists/source/browse/trunk/src/plists.erl например.

В общем, есть три вещи, которые вы хотите сделать:

  1. Выберите рабочую единицу, которая является «достаточно большой». Если рабочая единица слишком мала, вы умрете, обрабатывая накладные расходы. Если он слишком большой, вы умрете из-за простоя работников, особенно если ваша работа неравномерно распределена по количеству элементов в списке.

  2. Верхняя граница количества одновременных рабочих. Psyeugenic предлагает разделить его на планировщики, я предлагаю разделить его на ограничение количества заданий, скажем, на 100 заданий. То есть вы хотите запустить 100 заданий, а затем дождаться завершения некоторых из них, прежде чем запускать другие задания.

  3. Рассмотрите возможность изменения порядка элементов, если это возможно. Это намного быстрее, если вам не нужно учитывать порядок. Для многих проблем это возможно. Если порядок имеет значение, то используйте a dict для хранения содержимого, как предложено. Это быстрее для списков с большими элементами.

Основное правило заключается в том, что как только вы захотите параллельную обработку, вам редко понадобится представление ваших данных на основе списка. Список обладает присущей ему линейностью, чего вы не хотите. На эту тему есть выступление Гая Стила: http://vimeo.com/6624203

Ответ №3:

В этом случае вы можете использовать dict (из pid порожденного процесса для индексации в исходном списке) Pids вместо.

Комментарии:

1. Вы ссылаетесь на руководство по наборам, но в тексте указано, что это dict . Каким он должен быть?

2. Проблема заключается в том, чтобы связать каждый ответ с его аргументом в начальном списке. Если вы используете dict , то это проще. В противном случае будет сложнее получить правильный порядок.

3. @gleber: Исправлено. Изначально у меня были наборы, затем я понял, что вам нужно сохранить индекс.