Пользовательский источник Flink, запланированный на каждый час

#scala #apache-flink #flink-streaming

Вопрос:

Я пытаюсь создать пользовательский источник, который может работать только с определенным интервалом, например, 1 час опроса по протоколу http.Есть ли какой-либо способ использовать запланированного исполнителя внутри пользовательского источника и использовать его как поток.

Пользовательский источник выглядит следующим образом:

 import scala.io.Source.fromInputStream
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}

class HttpSource(url: String) extends RichSourceFunction[String] with LazyLogging {

  private var isRunning = true
  override def cancel(): Unit = isRunning = false

  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    httpStream(ctx.collect)
  }

  private def httpStream(rec: String => Unit) = {
    try{
      val request = Http(url)
      val response = request.execute()
      if (response.code != 200) {
        logger.warn("Metadata api response status code{}", response)
        isRunning = false
      }
      else {
        isRunning = true
        request.execute{ inputStream =>
          fromInputStream(inputStream)
            .getLines()
            .takeWhile(_ => isRunning)
            .foreach(rec)
        }
      }
    }
    catch {
      case e: Exception => isRunning = false
    }
  }
}
 

и в своей основной работе я просто использую как :

 val httpSource = env.addSource(new HttpSource(baseUri))
httpSource.broadcast.print()
 

Есть какие-нибудь предложения, как использовать scheduledExecutors ?

Ответ №1:

Потоковое задание предполагает, что оно выполняется бесконечно, а также источник. Я бы не стал слишком усложнять его использование scheduledExecutors . Вы можете просто сделать так, чтобы источник не опрашивал данные в течение некоторого интервала.

 var running = true
override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    while (running) {
        httpStream(ctx.collect)
        TimeUnit.HOURS.sleep(1)
   }
} 
Override
public def cancel(): Unit = {
        this.running = false
}
 

Комментарии:

1. Как будет вести себя контрольная точка здесь, если я использую sleep, так как я планирую использовать широковещательную передачу и использовать ее с другим источником потока, где я сопоставил бы значения из источника http со значениями потока кафки