#project-reactor
#проект-реактор
Вопрос:
Я оборачиваю голову вокруг потоковых приемников и не могу понять картину более высокого уровня. При использовании Sinks.Many<T> tryEmitNext
функция сообщает мне, была ли конкуренция и что мне делать в случае сбоя (FailFast/ Handler).
Но есть ли простая конструкция, которая позволяет мне безопасно выделять элементы из нескольких потоков. Например, вместо того, чтобы сообщать пользователю о наличии разногласий, и я должен попробовать еще раз, Возможно, добавить элементы в очередь (mpmc, mpsc и т. Д.) И уведомлять только тогда, когда очередь заполнена.
Теперь я могу сам добавить очередь, чтобы решить проблему, но это, похоже, распространенный вариант использования. Думаю, я здесь упускаю момент.
Ответ №1:
Я столкнулся с той же проблемой, перейдя с процессоров, которые поддерживают безопасную эмиссию из нескольких потоков. Я использую этот пользовательский EmitFailureHandler для выполнения цикла занятости, как предложено в документах EmitFailureHandler .
public static EmitFailureHandler etryOnNonSerializedElse(EmitFailureHandler fallback){
return (signalType, emitResult) -> {
if (emitResult == EmitResult.FAIL_NON_SERIALIZED) {
LockSupport.parkNanos(10);
return true;
} else
return fallback.onEmitFailure(signalType, emitResult);
};
}
Существуют различные запутанные аспекты реализации 3.4.0
- Подразумевается, что, если не используется небезопасный вариант, приемник поддерживает сериализованную эмиссию, но на самом деле все, что делает сериализованная версия, — это быстрый сбой в случае одновременной эмиссии.
- Приемник, предоставляемый Flux.Create, поддерживает потокобезопасное излучение.
Я надеюсь, что в какой-то момент библиотека предложит надежную альтернативу этому.
Комментарии:
1. Это решение, конечно, может привести к 100% загрузке процессора, если подписчик потока плохо себя вел. Более сложным подходом было бы заблокировать поток-эмитент, используя, например, защелку обратного отсчета в случае конфликта. Другим подходом было бы создать оболочку для приемника, которая использует очередь в случае конфликта. Это можно скопировать из github.com/reactor/reactor-core/blob/v3.4.1/reactor-core/src /…