Привязка таблицы к потоку данных: как получить доступ к имени столбца?

#apache-flink #flink-streaming #flink-sql

#apache-flink #flink-streaming #flink-sql

Вопрос:

Я хочу использовать тему Kafka в таблице, используя Flink SQL, а затем преобразовать ее обратно в поток данных.

Вот SOURCE_DDL :

 CREATE TABLE kafka_source (
    user_id BIGINT,
    datetime TIMESTAMP(3),
    last_5_clicks STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'aiinfra.fct.userfeature.0',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'test-group',
    'format' = 'json'
)
 

С помощью Flink я выполняю DDL.

 val settings = EnvironmentSettings.newInstance.build
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
tableEnv.executeSql(SOURCE_DDL)
val table = tableEnv.from("kafka_source")
 

Затем я преобразую его в поток данных и выполняю логику нисходящего потока в map(e => ...) части.

 tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e => ...)
 

Внутри map(e => ...) части я хотел бы получить доступ к имени столбца, в данном случае last_5_clicks . Почему? Потому что у меня могут быть разные источники с разными именами столбцов (например , last_10min_page_view ), но я хотел бы повторно использовать код map(e => ...) .

Есть ли способ сделать это? Спасибо.

Комментарии:

1. Какой в этом смысл ?? Я имею в виду, что на данном map этапе вы уже преобразовали данные в кортежи, чтобы иметь доступ к полям через их индекс e._1 , поэтому вам действительно не нужно знать имя столбца, из которого они были созданы.

Ответ №1:

Начиная с Flink 1.12, к нему можно получить доступ через Table.getSchema.getFieldNames . Начиная с версии 1.13, к нему можно получить доступ через Row.getFieldNames .