#java #apache-kafka #vert.x #mutiny
Вопрос:
У меня есть следующий метод:
private boolean isAllProcessedForChannel(String channel) {
KafkaConsumer<Object, Object> consumer = clientService.getConsumer(channel);
Uni<Map<TopicPartition, Long>> positionsUni = consumer.getPositions();
Map<TopicPartition, Long> positions = positionsUni.await().indefinitely();
for (Entry<TopicPartition, Long> entry : positions.entrySet()) {
Long position = entry.getValue();
TopicPartition partition = entry.getKey();
Uni<Map<TopicPartition, OffsetAndMetadata>> committedUni = consumer.committed(partition);
OffsetAndMetadata offsetAndMetadata = committedUni.await().indefinitely().get(partition);
if (offsetAndMetadata != null) {
long offset = offsetAndMetadata.offset();
log.info("Offset/Position ({}): {}/{}", partition.partition(), offset, position);
if (offset != position) {
return false;
}
}
}
return true;
}
В конце концов мне нужно получить Uni, который указывает, если один из разделов еще не обработан. Мое текущее решение состоит await
в том, чтобы занять 2 позиции. Я не понимаю, как сделать все это «действительно» реактивным.
Для каждой записи на карте первого Uni мне нужно вызвать метод, который сам возвращает Uni. Затем мне нужно сравнить значение из записи (первый Uni) с результатом второго Uni. И в конце концов мне нужно проверить, все ли результаты сравнения верны, и вернуть это как один Uni.
Есть ли у кого-нибудь подсказка о том, как этого добиться? Или это просто слишком сложно, и я должен придерживаться своего «синхронного» пути?
Комментарии:
1. Если сам метод не возвращает будущий или другой асинхронный тип, то в чем смысл применения реактивного подхода в методе? Есть ли в этом действительно какая-то польза?
2. Метод должен возвращать
Uni<Boolean>
в конце, поэтому подпись будет:private Uni<Boolean> isAllProcessedForChannel(String channel)
. В приведенном выше коде он возвращает простоеboolean
, что мне не нужно.
Ответ №1:
Сначала вы можете преобразовать записи позиций, чтобы Multi
затем для каждой записи получить Uni<Boolean>
значение, которое false
появляется, когда позиция отличается от смещения. В конце концов вы объединяете результаты и берете false
только первый:
private Uni<Boolean> isAllProcessedForChannel(String channel) {
return consumer.getPositions()
.onItem().transformToMulti(positions -> Multi.createFrom().iterable(positions.entrySet()))
.onItem().transformToUniAndMerge(entry -> {
Long position = entry.getValue();
TopicPartition partition = entry.getKey();
return consumer.committed(partition).onItem().transform(committed -> {
OffsetAndMetadata offsetAndMetadata = committed.get(partition);
if (offsetAndMetadata != null) {
long offset = offsetAndMetadata.offset();
if (offset != position) {
return false;
}
}
return true;
});
})
.filter(Boolean.FALSE::equals)
.toUni();
}
Комментарии:
1. Кажется, работает, большое спасибо!