Совместное использование динамических таблиц между программами Flink

#apache-flink #flink-sql

Вопрос:

У меня есть задание Flink, которое создает динамическую таблицу из потока списка изменений базы данных. Определение таблицы выглядит следующим образом:

 tableEnv.sqlUpdate("""
      CREATE TABLE some_table_name (
          id INTEGER,
          name STRING,
          created_at BIGINT,
          updated_at BIGINT
      )
      WITH (
          'connector' = 'kafka',
          'topic' = 'topic',
          'properties.bootstrap.servers' = 'localhost:9092',
          'properties.zookeeper.connect' = 'localhost:2181',
          'properties.group.id' = 'group_1',
          'format' = 'debezium-json',
          'debezium-json.schema-include' = 'true'
      )
    """)

 

При попытке ссылаться на эту таблицу в другом запущенном приложении Flink в том же кластере моя программа возвращает ошибку: SqlValidatorException: Object 'some_table_name' not found . Можно ли как-то зарегистрировать эту таблицу, чтобы другие программы могли ее использовать? Например, в таком заявлении, как это:

   tableEnv.sqlQuery("""
    SELECT count(*) FROM some_table_name
  """).execute().print()
 

Ответ №1:

Обратите внимание, что таблица во Flink не содержит никаких данных. Другое приложение Flink может самостоятельно создать другую таблицу, основанную, например, на той же теме Кафки . Так что отказ от совместного использования таблиц между приложениями не так трагичен, как вы могли бы ожидать.

Но вы можете обмениваться таблицами, храня их во внешнем каталоге. Например, для этой цели вы можете использовать каталог Apache Hive. Дополнительную информацию смотрите в документах.