Мятеж: Обрабатывайте несколько юнитов и используйте ценность «извне»

#java #apache-kafka #vert.x #mutiny

Вопрос:

У меня есть следующий метод:

 private boolean isAllProcessedForChannel(String channel) {
    KafkaConsumer<Object, Object> consumer = clientService.getConsumer(channel);

    Uni<Map<TopicPartition, Long>> positionsUni = consumer.getPositions();
    Map<TopicPartition, Long> positions = positionsUni.await().indefinitely();
    for (Entry<TopicPartition, Long> entry : positions.entrySet()) {
      Long position = entry.getValue();
      TopicPartition partition = entry.getKey();

      Uni<Map<TopicPartition, OffsetAndMetadata>> committedUni = consumer.committed(partition);
      OffsetAndMetadata offsetAndMetadata = committedUni.await().indefinitely().get(partition);
      if (offsetAndMetadata != null) {
        long offset = offsetAndMetadata.offset();
        log.info("Offset/Position ({}): {}/{}", partition.partition(), offset, position);
        if (offset != position) {
          return false;
        }
      }
    }

    return true;
}
 

В конце концов мне нужно получить Uni, который указывает, если один из разделов еще не обработан. Мое текущее решение состоит await в том, чтобы занять 2 позиции. Я не понимаю, как сделать все это «действительно» реактивным.

Для каждой записи на карте первого Uni мне нужно вызвать метод, который сам возвращает Uni. Затем мне нужно сравнить значение из записи (первый Uni) с результатом второго Uni. И в конце концов мне нужно проверить, все ли результаты сравнения верны, и вернуть это как один Uni.

Есть ли у кого-нибудь подсказка о том, как этого добиться? Или это просто слишком сложно, и я должен придерживаться своего «синхронного» пути?

Комментарии:

1. Если сам метод не возвращает будущий или другой асинхронный тип, то в чем смысл применения реактивного подхода в методе? Есть ли в этом действительно какая-то польза?

2. Метод должен возвращать Uni<Boolean> в конце, поэтому подпись будет: private Uni<Boolean> isAllProcessedForChannel(String channel) . В приведенном выше коде он возвращает простое boolean , что мне не нужно.

Ответ №1:

Сначала вы можете преобразовать записи позиций, чтобы Multi затем для каждой записи получить Uni<Boolean> значение, которое false появляется, когда позиция отличается от смещения. В конце концов вы объединяете результаты и берете false только первый:

 private Uni<Boolean> isAllProcessedForChannel(String channel) {
    return consumer.getPositions()
            .onItem().transformToMulti(positions -> Multi.createFrom().iterable(positions.entrySet()))
            .onItem().transformToUniAndMerge(entry -> {
                Long position = entry.getValue();
                TopicPartition partition = entry.getKey();

                return consumer.committed(partition).onItem().transform(committed -> {
                    OffsetAndMetadata offsetAndMetadata = committed.get(partition);
                    if (offsetAndMetadata != null) {
                        long offset = offsetAndMetadata.offset();
                        if (offset != position) {
                            return false;
                        }
                    }
                    return true;
                });
            })
            .filter(Boolean.FALSE::equals)
            .toUni();
}
 

Комментарии:

1. Кажется, работает, большое спасибо!