#apache-kafka #apache-flink #flink-streaming
Вопрос:
Я использую flink версии 1.13.0
Когда я пытаюсь использовать стратегии водяных знаков Кафки с помощью документа flink, который, похоже, не работает, функция оконного процесса не будет запущена.
И я хочу знать, таким образом, метка времени водяного знака будет использовать время потребления или время производства в кафке?
мой потребительский код выглядит так:
val source = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties)
.setCommitOffsetsOnCheckpoints(true)
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[String](Duration.ofSeconds(5)))
stream = env.addSource(source)
.name(nodeCfg.nodeName)
.uid(nodeCfg.nodeName)
.setParallelism(nodeCfg.workerCount)
и используйте окно, подобное этому:
processStream
.keyBy(_.num)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(new LatTimeAggregate(), new SignLatCalculateProcess())
.name(nodeCfg.nodeName)
.uid(nodeCfg.nodeName)
.setParallelism(nodeCfg.workerCount)
.addSink(new SignLatSink(serverConfig.smsRuleRedis))
.name("lat_count_sink")
.uid("lat_count_sink")
и топологический график, подобный этому:
Ответ №1:
Поскольку вы не указали назначителя временных меток в своей стратегии водяных знаков, вы полагаетесь на то, что пользователь FlinkKafkaConsumer назначил временные метки записям потока. Это будет работать только в том случае, если записи, считываемые из Кафки, имеют метки времени в своих заголовках. В противном случае вам потребуется реализовать назначитель временных меток для извлечения временных меток из событий.
Обратите внимание, что вы не сможете реализовать назначитель временных меток, который может применить пользователь FlinkKafkaConsumer, если вы также не реализуете десериализатор, который пользователь FlinkKafkaConsumer может использовать для создания объектов с временными метками, которые затем можно извлечь. В противном случае вы можете применить стратегию водяных знаков где-нибудь после источника.
Если отсутствие временных меток не является проблемой, есть и другие вещи, которые могут быть проблемой. Например, у вас может быть незанятый раздел Кафки или не хватает событий достаточно далеко в будущем, чтобы закрыть окно.
Кстати, если ваши события упорядочены по разделам, и если вы вызываете assignTimestampsAndWatermarks на FlinkKafkaConsumer (что вы в настоящее время делаете), то вы можете использовать forMonotonousTimestamps
вместо forBoundedOutOfOrderness
, что имеет некоторые существенные преимущества.
Комментарии:
1. Тема Кафки, которую я использую, имеет доступ только для чтения, есть ли способ узнать, назначена ли метка времени записям Кафки
2. Потребитель консоли Kafka может отображать метки времени-подробности см. в его параметрах. Или вы можете заменить
SimpleStringSchema
на aKafkaDeserializationSchema
, а затем изучить записи потребителей с помощью отладчика, чтобы узнать, есть ли у них метки времени.