Миграция Flink: BucketingSink [T] в StreamingFileSink [T]

#java #scala #apache-flink

#java #scala #apache-flink

Вопрос:

У меня самая старая версия Flink, и я хочу обновить ее до последней стабильной версии. BucketingSink в последней версии не используется, и я пытаюсь ее изменить StreamingFileSink . Для инициализации я использую StreamingFileSink.forBulkFormat , но получаю ошибку:

 type arguments [T] do not conform to method forSpecificRecord's type parameter bo
unds [T <: org.apache.avro.specific.SpecificRecordBase]
[ERROR] .forBulkFormat(new Path(s"${conf.output}/$path") , AvroWriters.forSpecificRecord[T](classOf[T]) )
  

Также я не могу найти, как установить bucketer:DateTimeBucketer[T], inactiveBucketThreshold, writer: Writer[T]

Не могли бы вы помочь мне найти правильный путь.

Старый код:

 trait Runner[T <: SpecificRecordBase] extends Serializable {
      def createHdfsSink(conf: FlinkConfig, path: String): BucketingSink[T] = {
            val bucketer = new DateTimeBucketer[T]
            val sink = new BucketingSink[T](s"${conf.output}/$path")
            sink
              .setBatchSize(toBytes(conf.batchSize))
              .setBucketer(bucketer)
              .setInactiveBucketThreshold(toMillis(conf.inactiveBucketThreshold))
              .setWriter(writer)
              .setPendingPrefix(pendingPrefix)
              .setBatchRolloverInterval(conf.fileOpenIntervalTime)
      }
  

Новый код с ошибками:

   def createHdfsStreamingSink[T : ClassTag](conf: FlinkConfig, path: String): StreamingFileSink[T] = {
    val sink = StreamingFileSink
      .forBulkFormat(new Path(s"${conf.output}/$path") , AvroWriters.forSpecificRecord[T](classOf[T]) )
      .build()

    // TODO:        .withOutputFileConfig()
    sink
  }
  

Ответ №1:

Я думаю, вместо этого вам следует использовать forReflectRecord(Class<T> type) метод, который будет использовать отражение для создания схемы для типа и использовать эту схему для записи записей. Пользовательский назначитель корзины настраивается во StreamingFileSink время установки и указывается с помощью .withBucketAssigner(BucketAssigner<IN, ID> assigner) вызова метода.

Итак, наконец, ваш StreamingFileSink конструктор будет выглядеть следующим образом:

 def createHdfsStreamingSink[T : ClassTag](conf: FlinkConfig, path: String): StreamingFileSink[T] = {
    val sink = StreamingFileSink
      .forBulkFormat(new Path(s"${conf.output}/$path") , AvroWriters.forReflectRecord[T](classOf[T]) )
      .withBucketAssigner(bucketAssigner)
      .build()
    sink
}