#apache-flink #flink-streaming
#apache-flink #flink-потоковая передача
Вопрос:
Когда мне нужно работать с вводом-выводом (запрос к БД, вызов третьего API, …), я могу использовать RichAsyncFunction . Но мне нужно взаимодействовать с Google Sheet через API GG Sheet: https://developers.google.com/sheets/api/quickstart/java . Этот API является sync. Я написал ниже фрагмент кода:
public class SendGGSheetFunction extends RichAsyncFunction<Obj, String> {
@Override
public void asyncInvoke(Obj message, final ResultFuture<String> resultFuture) {
CompletableFuture.supplyAsync(() -> {
syncSendToGGSheet(message);
return "";
}).thenAccept((String result) -> {
resultFuture.complete(Collections.singleton(result));
});
}
}
Но я обнаружил, что сообщение отправляется в GGSheet очень медленно, кажется, оно отправляется синхронно.
Комментарии:
1. Вы найдете пример в этой теме списка рассылки: apache-flink-user-mailing-list-archive.2336050.n4.nabble.com /…
Ответ №1:
Большая часть кода, выполняемого пользователями, AsyncIO
изначально синхронизирована. Вам просто нужно убедиться, что он действительно выполняется в отдельном потоке. Чаще всего используется (статически разделяемый) ExecutorService
.
private class SendGGSheetFunction extends RichAsyncFunction<Obj, String> {
private transient ExecutorService executorService;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
executorService = Executors.newFixedThreadPool(30);
}
@Override
public void close() throws Exception {
super.close();
executorService.shutdownNow();
}
@Override
public void asyncInvoke(final Obj message, final ResultFuture<String> resultFuture) {
executorService.submit(() -> {
try {
resultFuture.complete(syncSendToGGSheet(message));
} catch (SQLException e) {
resultFuture.completeExceptionally(e);
}
});
}
}
Вот некоторые соображения о том, как настроить AsyncIO для увеличения пропускной способности: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Async-IO-operator-tuning-micro-benchmarks-td35858.html
Комментарии:
1. Я нашел 2 способа реализации в asyncInvoke:
executorService.submit()
(как и вы) илиCompletableFuture.supplyAsync(() -> {executor.submit()})
. Чем они отличаются?2.
supplyAsync
использует общий JVM ForkJoinPool, где у вас нет контроля над размером пула. Но это должно быть вместо использованияexecutor
.