Типы Flink — importing не исправляют «не удалось найти неявное значение для параметра доказательства типа….TypeInformation»

#json #scala #apache-flink #flink-streaming

#json #scala #apache-flink #flink-streaming

Вопрос:

В Scala Flink, независимо от того, что я пытаюсь, я продолжаю получать ошибку, подобную этой:

не удалось найти неявное значение для параметра доказательства типа org.apache.flink.api.common.typeinfo.TypeInformation[String] .map(t => t)

Я попробовал очевидную вещь импорта:

     import org.apache.flink.streaming.api.scala._
    import org.apache.flink.api.scala._
 

но это не помогло устранить ошибку компиляции. Моя цель — проанализировать значение JSON из строки, но как я могу это сделать, когда я даже не могу сопоставить строку со строкой (не говоря уже о выполнении parse(t) на карте)?

Я использую Flink 1.12.1 и Scala 2.12.

 object AmplitudeExample {
  def main(args: Array[String]) {
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.api.scala._
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val text = env.readTextFile("/Users/dbost/src/amplitude-flink/example-data.json")

    val partitionedEvents = text
      .map(t => t)
     
    partitionedEvents.print()
  }
}
 

Если я смогу заставить это работать, то моя следующая задача — проанализировать строку с помощью circe, например:

 import io.circe.parser._

object AmplitudeExample {
  def main(args: Array[String]) {
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.api.scala._
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val text = env.readTextFile("/Users/dbost/src/amplitude-flink/example-data.json")

    val partitionedEvents = text
      .map(t => parse(t))

    partitionedEvents.print()
  }
}
 

Ответ №1:

При вызове map(…) попробуйте добавить TypeInformation следующим образом map(...)(TypeInformation.of(classOf[String])

Ответ №2:

Вы могли бы попробовать это.

 object AmplitudeExample {
  def main(args: Array[String]) {
    import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val text = env.readTextFile("/data/tpcds/test")

    val partitionedEvents = text
      .map(t => t)

    partitionedEvents.print()
  }
}
 

Комментарии:

1. Вы думаете, я получаю неправильную StreamExecutionEnvironment?