#scala #apache-flink #flink-streaming
Вопрос:
Я пытаюсь создать пользовательский источник, который может работать только с определенным интервалом, например, 1 час опроса по протоколу http.Есть ли какой-либо способ использовать запланированного исполнителя внутри пользовательского источника и использовать его как поток.
Пользовательский источник выглядит следующим образом:
import scala.io.Source.fromInputStream
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
class HttpSource(url: String) extends RichSourceFunction[String] with LazyLogging {
private var isRunning = true
override def cancel(): Unit = isRunning = false
override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
httpStream(ctx.collect)
}
private def httpStream(rec: String => Unit) = {
try{
val request = Http(url)
val response = request.execute()
if (response.code != 200) {
logger.warn("Metadata api response status code{}", response)
isRunning = false
}
else {
isRunning = true
request.execute{ inputStream =>
fromInputStream(inputStream)
.getLines()
.takeWhile(_ => isRunning)
.foreach(rec)
}
}
}
catch {
case e: Exception => isRunning = false
}
}
}
и в своей основной работе я просто использую как :
val httpSource = env.addSource(new HttpSource(baseUri))
httpSource.broadcast.print()
Есть какие-нибудь предложения, как использовать scheduledExecutors ?
Ответ №1:
Потоковое задание предполагает, что оно выполняется бесконечно, а также источник. Я бы не стал слишком усложнять его использование scheduledExecutors
. Вы можете просто сделать так, чтобы источник не опрашивал данные в течение некоторого интервала.
var running = true
override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
while (running) {
httpStream(ctx.collect)
TimeUnit.HOURS.sleep(1)
}
}
Override
public def cancel(): Unit = {
this.running = false
}
Комментарии:
1. Как будет вести себя контрольная точка здесь, если я использую sleep, так как я планирую использовать широковещательную передачу и использовать ее с другим источником потока, где я сопоставил бы значения из источника http со значениями потока кафки