Соединитель файловой системы не выдает выходных данных и streamEnvironment.executes() выдает «операторы не определены»

#apache-flink #flink-sql

#apache-flink #flink-sql

Вопрос:

У меня есть следующий код, который хочет записать данные, сгенерированные с помощью datagen, в файл, но когда я запускаю приложение, целевой каталог не создается, и данные не записываются.

Когда я добавляю env.execute() в конце кода, он жалуется, что No operators defined in streaming topology. Cannot execute.

Я бы спросил, как заставить приложение работать, спасибо.

 test("insert into table") {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tenv = StreamTableEnvironment.create(env)
    val ddl =
      """
      create temporary table abc(
       name STRING,
       age INT
      ) with (
        'connector' = 'datagen'
      )
      """.stripMargin(' ')

    tenv.executeSql(ddl)

    val sql =
      """
        select * from abc
      """.stripMargin(' ')

    val sinkDDL =
      s"""
      create temporary table xyz(
       name STRING,
       age INT
      ) with (
       'connector' = 'filesystem',
       'path' = 'D:\${System.currentTimeMillis()}-csv' ,
       'format' = 'csv'

      )
      """.stripMargin(' ')

    tenv.executeSql(sinkDDL)

    val insertInSQL =
      """
      insert into xyz
      select name, age from abc
      """.stripMargin(' ')

    tenv.executeSql(insertInSQL)


//    env.execute()


  }
 

Комментарии:

1. не могли бы вы уточнить, на какую версию Flink опирается код?

2. Спасибо @svend. Я использую Flink 1.12.0

Ответ №1:

Я думаю, у вас должен быть UDF при выполнении таблицы, см.

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/functions/udfs.html#table-functions

Вы можете посмотреть пример, написать функцию и вставить ее в свой конвейер sql, это работает как «оператор» в вашем сообщении об ошибке.

Комментарии:

1. Спасибо @litchy, но я не думаю, что мне нужно писать функцию для такого простого и базового сценария.

2. @Tom Тогда вам нужно использовать встроенную функцию, например print() , встроенный приемник. Я не знаю ни одного, но вы можете просто определить функцию def fun(val v) = return v .

3. Спасибо @litchy, похоже, я понял вашу точку зрения, вы имеете в виду, что я должен обеспечить потоковую операцию в конвейере, чтобы это env.execute радовало?

4. Да, спасибо @litchy, это работает. Однако это своего рода неудобство.

Ответ №2:

Я думаю, что это действительно работает, просто не тогда, когда мы думаем, что это так 🙂

Я тестировал это с помощью планировщика Blink в Flink 1.12:

 "org.apache.flink" %% "flink-table-planner-blink" % flinkVersion % "provided"
 

Вызов env.execute() StreamingEnvironment в конце фактически не требуется, поскольку каждое .executeSql() предыдущее в программе уже отправляет асинхронные задания. Затем приемник в вашем коде привязывается к одному из этих заданий, а не к заданию, которое env.execute() будет запущено (которое в данном случае является пустым заданием, вызывающим упомянутую вами ошибку). Я нашел подсказку об этом в этом ответе в списке рассылки.

Когда я запускаю код в вопросе (с помощью планировщика Blink и адаптирую вывод к 'path' = '/tmp/hello-flink-${System.currentTimeMillis()}-csv' своему хосту), я вижу, что постепенно создается несколько скрытых файлов. Я предполагаю, что они аналогичным образом скрыты на вашем хосте Windows (файлы, начинающиеся с . значения ниже, скрыты в Linux):

 ls -ltra /tmp/hello-flink-1609574239647-csv
total 165876
drwxrwxrwt 40 root  root      12288 Jan  2 08:57 ..
-rw-rw-r--  1 svend svend 134217771 Jan  2 08:59 .part-393f5557-894a-4396-bdf9-c7813fdd1d75-0-0.inprogress.48863a2b-f022-401b-95e3-659ec4920162
drwxrwxr-x  2 svend svend      4096 Jan  2 08:59 .
-rw-rw-r--  1 svend svend  35616014 Jan  2 08:59 .part-393f5557-894a-4396-bdf9-c7813fdd1d75-0-1.inprogress.3412bcb0-d30d-43be-819b-1acf26a0a8bb
 

Происходит просто то, что текущая политика соединителя файловой системы SQL по умолчанию ожидает гораздо дольше, прежде чем передавать файлы.

Если вы запускаете свой код из IDE, вы можете адаптировать создание среды следующим образом (обычно это делается в conf/flink-conf.yaml ):

   val props = new Properties
  props.setProperty("execution.checkpointing.interval", "10000")  // 10000 ms
  val conf = ConfigurationUtils.createConfiguration(props)
  val fsEnv = StreamExecutionEnvironment.createLocalEnvironment(1, conf)
 

и используйте небольшой размер файла в выходном соединителе:

       create temporary table xyz(
       name STRING,
       age INT
      ) with (
       'connector' = 'filesystem',
       'path' = '/tmp/hello-flink-${System.currentTimeMillis()}-csv' ,
       'format' = 'csv',
       'sink.rolling-policy.file-size' = '1Mb'
 

И файлы CSV теперь должны быть зафиксированы намного быстрее:

 ls -ltra hello-flink-1609575075617-csv
total 17896
-rw-rw-r--  1 svend svend 1048669 Jan  2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-0
-rw-rw-r--  1 svend svend 1048644 Jan  2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-1
-rw-rw-r--  1 svend svend 1048639 Jan  2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-2
-rw-rw-r--  1 svend svend 1048676 Jan  2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-3
-rw-rw-r--  1 svend svend 1048680 Jan  2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-4
-rw-rw-r--  1 svend svend 1048642 Jan  2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-5
 

Комментарии:

1. Спасибо @svend за большую помощь! Я попробую

2. Круто, приятно слышать. Не могли бы вы поддержать ответ, поскольку он был полезен? Спасибо