#java #reactive #java.util.concurrent #completable-future #java-flow
#java #реактивный #java.util.concurrent #завершаемый-будущее #java-поток
Вопрос:
Можно ли заставить Подписчика запускаться в том же потоке, что и Издатель (синхронно)? Я могу использовать CompletableFuture, но он дает только один результат. Но что, если мне нужно, чтобы подписчику было доставлено много результатов? Пожалуйста, посмотрите на эти небольшие тесты для лучшего объяснения.
@Test
public void testCompletableFutureThreads() throws InterruptedException {
CompletableFuture<String> f = new CompletableFuture<String>();
f.thenAccept(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println("accept " s " in " Thread.currentThread().getName());
}
});
Thread.sleep(200);
System.out.println("send complete from " Thread.currentThread().getName());
f.complete("test");
Thread.sleep(1000);
}
@Test
public void testSubmissionPublisherThreads() throws InterruptedException {
SubmissionPublisher<String> publisher = new SubmissionPublisher<String>();
publisher.subscribe(new Flow.Subscriber<String>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(String item) {
System.out.println("onNext in " Thread.currentThread().getName() " received " item);
}
@Override
public void onError(Throwable throwable) {
System.err.println("onError in " Thread.currentThread().getName());
throwable.printStackTrace(System.err);
}
@Override
public void onComplete() {
System.err.println("onComplete in " Thread.currentThread().getName());
}
});
int i = 10;
while (i-- > 0) {
Thread.sleep(100);
System.out.println("publisher from " Thread.currentThread().getName());
publisher.submit("" System.currentTimeMillis());
}
}
Комментарии:
1. у вас есть один подписчик и один издатель или может быть много подписчиков и / или издателей?
2. один издатель и много подписчиков
Ответ №1:
Вы можете использовать <a rel=»noreferrer noopener nofollow» href=»https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/SubmissionPublisher.html#(java.util.concurrent.Executor,int)» rel=»nofollow noreferrer»> SubmissionPublisher(Executor, int)
конструктор и предоставить an Executor
, который запускает все в текущем потоке:
final SubmissionPublisher<String> publisher = new SubmissionPublisher<>(new Executor() {
@Override
public void execute(Runnable command) {
command.run();
}
}, Flow.defaultBufferSize());
или, если вы предпочитаете лямбды:
final SubmissionPublisher<String> publisher = new SubmissionPublisher<>(Runnable::run, Flow.defaultBufferSize());
Конструктор по умолчанию используется ForkJoinPool.commonPool()
для отправки сигналов.