Синхронный SubmissionPublisher

#java #reactive #java.util.concurrent #completable-future #java-flow

#java #реактивный #java.util.concurrent #завершаемый-будущее #java-поток

Вопрос:

Можно ли заставить Подписчика запускаться в том же потоке, что и Издатель (синхронно)? Я могу использовать CompletableFuture, но он дает только один результат. Но что, если мне нужно, чтобы подписчику было доставлено много результатов? Пожалуйста, посмотрите на эти небольшие тесты для лучшего объяснения.

   @Test
  public void testCompletableFutureThreads() throws InterruptedException {
    CompletableFuture<String> f = new CompletableFuture<String>();
    f.thenAccept(new Consumer<String>() {
      @Override
      public void accept(String s) {
        System.out.println("accept "   s   " in "   Thread.currentThread().getName());
      }
    });
    Thread.sleep(200);
    System.out.println("send complete from "   Thread.currentThread().getName());
    f.complete("test");
    Thread.sleep(1000);
  }



  @Test
  public void testSubmissionPublisherThreads() throws InterruptedException {

    SubmissionPublisher<String> publisher = new SubmissionPublisher<String>();

    publisher.subscribe(new Flow.Subscriber<String>() {
      @Override
      public void onSubscribe(Flow.Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
      }

      @Override
      public void onNext(String item) {
        System.out.println("onNext in "   Thread.currentThread().getName()   " received "   item);
      }

      @Override
      public void onError(Throwable throwable) {
        System.err.println("onError in "   Thread.currentThread().getName());
        throwable.printStackTrace(System.err);
      }

      @Override
      public void onComplete() {
        System.err.println("onComplete in "   Thread.currentThread().getName());
      }
    });

    int i = 10;
    while (i-- > 0) {
      Thread.sleep(100);
      System.out.println("publisher from "   Thread.currentThread().getName());
      publisher.submit(""   System.currentTimeMillis());

    }
  }
 

Комментарии:

1. у вас есть один подписчик и один издатель или может быть много подписчиков и / или издателей?

2. один издатель и много подписчиков

Ответ №1:

Вы можете использовать <a rel=»noreferrer noopener nofollow» href=»https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/SubmissionPublisher.html#(java.util.concurrent.Executor,int)» rel=»nofollow noreferrer»> SubmissionPublisher(Executor, int) конструктор и предоставить an Executor , который запускает все в текущем потоке:

 final SubmissionPublisher<String> publisher = new SubmissionPublisher<>(new Executor() {
    @Override
    public void execute(Runnable command) {
        command.run();
    }
}, Flow.defaultBufferSize());
 

или, если вы предпочитаете лямбды:

 final SubmissionPublisher<String> publisher = new SubmissionPublisher<>(Runnable::run, Flow.defaultBufferSize());
 

Конструктор по умолчанию используется ForkJoinPool.commonPool() для отправки сигналов.