Как вызвать приемники.Many.tryEmitNext из нескольких потоков?

#project-reactor

#проект-реактор

Вопрос:

Я оборачиваю голову вокруг потоковых приемников и не могу понять картину более высокого уровня. При использовании Sinks.Many<T> tryEmitNext функция сообщает мне, была ли конкуренция и что мне делать в случае сбоя (FailFast/ Handler).

Но есть ли простая конструкция, которая позволяет мне безопасно выделять элементы из нескольких потоков. Например, вместо того, чтобы сообщать пользователю о наличии разногласий, и я должен попробовать еще раз, Возможно, добавить элементы в очередь (mpmc, mpsc и т. Д.) И уведомлять только тогда, когда очередь заполнена.

Теперь я могу сам добавить очередь, чтобы решить проблему, но это, похоже, распространенный вариант использования. Думаю, я здесь упускаю момент.

Ответ №1:

Я столкнулся с той же проблемой, перейдя с процессоров, которые поддерживают безопасную эмиссию из нескольких потоков. Я использую этот пользовательский EmitFailureHandler для выполнения цикла занятости, как предложено в документах EmitFailureHandler .

 public static EmitFailureHandler etryOnNonSerializedElse(EmitFailureHandler fallback){
    return (signalType, emitResult) -> {
        if (emitResult == EmitResult.FAIL_NON_SERIALIZED) {
            LockSupport.parkNanos(10);
            return true;
        } else
            return fallback.onEmitFailure(signalType, emitResult);
    };
}
 

Существуют различные запутанные аспекты реализации 3.4.0

  • Подразумевается, что, если не используется небезопасный вариант, приемник поддерживает сериализованную эмиссию, но на самом деле все, что делает сериализованная версия, — это быстрый сбой в случае одновременной эмиссии.
  • Приемник, предоставляемый Flux.Create, поддерживает потокобезопасное излучение.

Я надеюсь, что в какой-то момент библиотека предложит надежную альтернативу этому.

Комментарии:

1. Это решение, конечно, может привести к 100% загрузке процессора, если подписчик потока плохо себя вел. Более сложным подходом было бы заблокировать поток-эмитент, используя, например, защелку обратного отсчета в случае конфликта. Другим подходом было бы создать оболочку для приемника, которая использует очередь в случае конфликта. Это можно скопировать из github.com/reactor/reactor-core/blob/v3.4.1/reactor-core/src /…