Как получить несколько асинхронных результатов за заданный тайм-аут с помощью GPars?

#multithreading #groovy #parallel-processing #java-8 #gpars

#многопоточность #groovy #параллельная обработка #java-8 #gpars

Вопрос:

Я хотел бы получить несколько «дорогостоящих» результатов, используя параллельную обработку, но в течение определенного времени ожидания.

Я использую поток данных GPars.задача, но, похоже, я чего-то не понимаю, поскольку процесс возвращается только тогда, когда привязаны все переменные потока данных.

 def timeout = 500
def mapResults = []
GParsPool.withPool(3) { 
    def taskWeb1 = Dataflow.task {
        mapResults.web1 = new URL('http://web1.com').getText()
    }.join(timeout, TimeUnit.MILLISECONDS)
    def taskWeb2 = Dataflow.task {
        mapResults.web2 = new URL('http://web2.com').getText()
    }.join(timeout, TimeUnit.MILLISECONDS)
    def taskWeb3 = Dataflow.task {
        mapResults.web3 = new URL('http://web3.com').getText()
    }.join(timeout, TimeUnit.MILLISECONDS)
}
  

Я видел в документе о тайм-аутах GPars способ использования Select для получения самого быстрого результата за время тайм-аута.
Но я ищу способ получить как можно больше результатов за заданный промежуток времени.

Есть ли лучший способ «GPars» для достижения этого? Или с Java 8 Future / Callable?

Ответ №1:

Поскольку вы тоже заинтересованы в решениях на базе Java 8, вот способ сделать это:

 int timeout = 250;
ExecutorService executorService = Executors.newFixedThreadPool(3);
try {
    Map<String, CompletableFuture<String>> map = 
        Stream.of("http://google.com", "http://yahoo.com", "http://bing.com")
            .collect(
                Collectors.toMap(
                    // the key will be the URL
                    Function.identity(),
                    // the value will be the CompletableFuture text fetched from the url
                    (url) -> CompletableFuture.supplyAsync(
                        () -> readUrl(url, timeout), 
                        executorService
                    )
                )
            );
    executorService.awaitTermination(timeout, TimeUnit.MILLISECONDS);

    //print the resulting map, cutting the text at 100 chars
    map.entrySet().stream().forEach(entry -> {
        CompletableFuture<String> future = entry.getValue();
        boolean completed = future.isDone() 
                amp;amp; !future.isCompletedExceptionally() 
                amp;amp; !future.isCancelled(); 
        System.out.printf("url %s completed: %s, error: %s, result: %.100sn",
            entry.getKey(),
            completed, 
            future.isCompletedExceptionally(),
            completed ? future.getNow(null) : null);
    });
} catch (InterruptedException e) {
    //rethrow
} finally {
    executorService.shutdownNow();
}
  

Это даст вам столько Future адресов, сколько у вас есть, но дает вам возможность увидеть, не завершилась ли какая-либо из задач с исключением. Код можно упростить, если вас не интересуют эти исключения, только содержимое успешных извлечений:

 int timeout = 250;
ExecutorService executorService = Executors.newFixedThreadPool(3);
try {
    Map<String, String> map = Collections.synchronizedMap(new HashMap<>());
    Stream.of("http://google.com", "http://yahoo.com", "http://bing.com")
        .forEach(url -> {
            CompletableFuture
                .supplyAsync(
                    () -> readUrl(url, timeout), 
                    executorService
                ).thenAccept(content -> map.put(url, content));
        });
    executorService.awaitTermination(timeout, TimeUnit.MILLISECONDS);

    //print the resulting map, cutting the text at 100 chars
    map.entrySet().stream().forEach(entry -> {
        System.out.printf("url %s completed, result: %.100sn",
            entry.getKey(), entry.getValue() );
    });
} catch (InterruptedException e) {
    //rethrow
} finally {
    executorService.shutdownNow();
}
  

Оба кода будут ждать около 250 миллисекунд (это займет лишь немного больше из-за отправки задач службе исполнителя) перед печатью результатов. Я обнаружил, что около 250 миллисекунд — это пороговое значение, при котором некоторые из этих URL-адресов могут быть получены в моей сети, но не обязательно все. Не стесняйтесь регулировать время ожидания для экспериментов.

Для readUrl(url, timeout) метода вы могли бы использовать служебную библиотеку, такую как Apache Commons IO. Задачи, отправленные в службу исполнителя, получат сигнал прерывания, даже если вы явно не учитываете timeout параметр. Я мог бы предоставить реализацию для этого, но я считаю, что это выходит за рамки основной проблемы в вашем вопросе.

Комментарии:

1. Спасибо за ответ, действительно, ожидание завершения является ключевым 😉