Могу ли я написать код синхронизации в RichAsyncFunction

#apache-flink #flink-streaming

#apache-flink #flink-потоковая передача

Вопрос:

Когда мне нужно работать с вводом-выводом (запрос к БД, вызов третьего API, …), я могу использовать RichAsyncFunction . Но мне нужно взаимодействовать с Google Sheet через API GG Sheet: https://developers.google.com/sheets/api/quickstart/java . Этот API является sync. Я написал ниже фрагмент кода:

 public class SendGGSheetFunction extends RichAsyncFunction<Obj, String> {

    @Override
    public void asyncInvoke(Obj message, final ResultFuture<String> resultFuture) {
        CompletableFuture.supplyAsync(() -> {
            syncSendToGGSheet(message);
            return "";
        }).thenAccept((String result) -> {
            resultFuture.complete(Collections.singleton(result));
        });
    }

}
  

Но я обнаружил, что сообщение отправляется в GGSheet очень медленно, кажется, оно отправляется синхронно.

Комментарии:

1. Вы найдете пример в этой теме списка рассылки: apache-flink-user-mailing-list-archive.2336050.n4.nabble.com /…

Ответ №1:

Большая часть кода, выполняемого пользователями, AsyncIO изначально синхронизирована. Вам просто нужно убедиться, что он действительно выполняется в отдельном потоке. Чаще всего используется (статически разделяемый) ExecutorService .

 private class SendGGSheetFunction extends RichAsyncFunction<Obj, String> {
   private transient ExecutorService executorService;

   @Override
   public void open(Configuration parameters) throws Exception {
      super.open(parameters);
      executorService = Executors.newFixedThreadPool(30);
   }

   @Override
   public void close() throws Exception {
      super.close();
      executorService.shutdownNow();
   }

   @Override
   public void asyncInvoke(final Obj message, final ResultFuture<String> resultFuture) {
      executorService.submit(() -> {
         try {
            resultFuture.complete(syncSendToGGSheet(message));
         } catch (SQLException e) {
            resultFuture.completeExceptionally(e);
         }
      });
   }
}
  

Вот некоторые соображения о том, как настроить AsyncIO для увеличения пропускной способности: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Async-IO-operator-tuning-micro-benchmarks-td35858.html

Комментарии:

1. Я нашел 2 способа реализации в asyncInvoke: executorService.submit() (как и вы) или CompletableFuture.supplyAsync(() -> {executor.submit()}) . Чем они отличаются?

2. supplyAsync использует общий JVM ForkJoinPool, где у вас нет контроля над размером пула. Но это должно быть вместо использования executor .