Отключить соединитель Kafka от eventhub

#apache-kafka #apache-flink #azure-eventhub

#apache-kafka #apache-flink #azure-eventhub

Вопрос:

Я использую Apache Flink и пытаюсь подключиться к Azure eventhub с помощью протокола Apache Kafka для получения сообщений от него. Мне удается подключиться к Azure eventhub и получать сообщения, но я не могу использовать функцию flink «setStartFromTimestamp (…)», как описано здесь ( https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration). Когда я пытаюсь получить некоторые сообщения из timestamp, Кафка сказал, что формат сообщения на стороне брокера находится перед 0.10.0. Кто-нибудь сталкивался с этим? Версия клиента Apache Kafka 2.0.1, версия Apache Flink 1.7.2

ОБНОВЛЕНО: пытался использовать примеры быстрого запуска Azure-Event-Hub (https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/quickstart/java ) в потребительский пакет добавлен код для получения смещения с меткой времени, он возвращает null, как и ожидалось, если версия сообщения ниже версии 0.10.0 kafka.

         List<PartitionInfo> partitionInfos = consumer.partitionsFor(TOPIC);
        List<TopicPartition> topicPartitions = partitionInfos.stream().map(pi -> new TopicPartition(pi.topic(), pi.partition())).collect(Collectors.toList());
        Map<TopicPartition, Long> topicPartitionToTimestampMap = topicPartitions.stream().collect(Collectors.toMap(tp -> tp, tp -> 0L));
        Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = consumer.offsetsForTimes(topicPartitionToTimestampMap);
        System.out.println(offsetAndTimestamp);
  

Комментарии:

1. Похоже, вы ответили на свой собственный пост?

2. Нет, потому что Azure Event Hub должен иметь версию kafka api 1.0, в соответствии с их документацией, которая больше 0.10.0. Я только что подтвердил, что это не работает также на их простом примере

3. Кажется, это не так, или есть ошибка

Ответ №1:

Извините, мы это пропустили. В EH теперь поддерживается функция Kafka offsetsForTimes() (ранее не поддерживавшаяся).

Не стесняйтесь открывать проблему с нашим Github в будущем. https://github.com/Azure/azure-event-hubs-for-kafka