Scala: не удается разрешить перегруженные методы (Flink WatermarkStrategy)

#apache-kafka #apache-flink #flink-streaming

#apache-kafka #apache-flink #потоковая передача flink

Вопрос:

Я следую документации Flink о том, как использовать WatermarkStrategy с KafkaConsumer. Код показан ниже

 val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(
  WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.ofSeconds(20)))

val stream: DataStream[MyType] = env.addSource(kafkaSource)
 

Каждый раз, когда я пытаюсь скомпилировать приведенный выше код, я получаю сообщение об ошибке

ошибка: перегруженное значение метода присваивает метки времени и водяные знаки альтернативам:

 error: overloaded method value assignTimestampsAndWatermarks with alternatives:
[ERROR]   (x$1: org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks[String])org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase[String] <and>
[ERROR]   (x$1: org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks[String])org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase[String] <and>
[ERROR]   (x$1: org.apache.flink.api.common.eventtime.WatermarkStrategy[String])org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase[String]
[ERROR]  cannot be applied to (org.apache.flink.api.common.eventtime.WatermarkStrategy[Nothing])
[ERROR]         consumer.assignTimestampsAndWatermarks(
 

Комментарии:

1. Какую версию flink вы используете? Не могли бы вы вставить полное сообщение eror.

2. Я добавил ошибку. Я использую flink 1.11.2

3. Без самостоятельного тестирования я бы сказал, что здесь отсутствует тип. В одном из тестовых примеров для назначения водяных знаков есть хороший пример: github.com/apache/flink/blob/… . Там вы можете видеть, что WatermarkStrategy принимает тип (который в вашем примере будет MyType). Надеюсь, это поможет

Ответ №1:

Приведенный ниже код возвращает WatermarkStrategyy[Ничего] вместо WatermarkStrategy[Строка]

   WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.ofSeconds(20)))
 

Я решил это с помощью этого кода

 val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
watermark: Watermark[String] = WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))
kafkaSource.assignTimestampsAndWatermarks(watermark)
 

Комментарии:

1. Это не работало с последними версиями Flink.

Ответ №2:

@Mayokun прав. Но чтобы упростить код, вы могли бы поместить информацию о типе сразу после статического метода:

 val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props) 
kafkaSource.assignTimestampsAndWatermarks(
     WatermarkStrategy.forBoundedOutOfOrderness[MyType](Duration.ofSeconds(20))
)
 

Комментарии:

1. Это похоже на получение элементов 20secs window?