#json #scala #apache-flink #flink-streaming
#json #scala #apache-flink #flink-streaming
Вопрос:
В Scala Flink, независимо от того, что я пытаюсь, я продолжаю получать ошибку, подобную этой:
не удалось найти неявное значение для параметра доказательства типа org.apache.flink.api.common.typeinfo.TypeInformation[String] .map(t => t)
Я попробовал очевидную вещь импорта:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala._
но это не помогло устранить ошибку компиляции. Моя цель — проанализировать значение JSON из строки, но как я могу это сделать, когда я даже не могу сопоставить строку со строкой (не говоря уже о выполнении parse(t)
на карте)?
Я использую Flink 1.12.1 и Scala 2.12.
object AmplitudeExample {
def main(args: Array[String]) {
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.readTextFile("/Users/dbost/src/amplitude-flink/example-data.json")
val partitionedEvents = text
.map(t => t)
partitionedEvents.print()
}
}
Если я смогу заставить это работать, то моя следующая задача — проанализировать строку с помощью circe, например:
import io.circe.parser._
object AmplitudeExample {
def main(args: Array[String]) {
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.readTextFile("/Users/dbost/src/amplitude-flink/example-data.json")
val partitionedEvents = text
.map(t => parse(t))
partitionedEvents.print()
}
}
Ответ №1:
При вызове map(…) попробуйте добавить TypeInformation следующим образом map(...)(TypeInformation.of(classOf[String])
Ответ №2:
Вы могли бы попробовать это.
object AmplitudeExample {
def main(args: Array[String]) {
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.readTextFile("/data/tpcds/test")
val partitionedEvents = text
.map(t => t)
partitionedEvents.print()
}
}
Комментарии:
1. Вы думаете, я получаю неправильную StreamExecutionEnvironment?