#apache-flink #flink-streaming #flink-sql
#apache-flink #flink-streaming #flink-sql
Вопрос:
Я хочу использовать тему Kafka в таблице, используя Flink SQL, а затем преобразовать ее обратно в поток данных.
Вот SOURCE_DDL
:
CREATE TABLE kafka_source (
user_id BIGINT,
datetime TIMESTAMP(3),
last_5_clicks STRING
) WITH (
'connector' = 'kafka',
'topic' = 'aiinfra.fct.userfeature.0',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'test-group',
'format' = 'json'
)
С помощью Flink я выполняю DDL.
val settings = EnvironmentSettings.newInstance.build
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
tableEnv.executeSql(SOURCE_DDL)
val table = tableEnv.from("kafka_source")
Затем я преобразую его в поток данных и выполняю логику нисходящего потока в map(e => ...)
части.
tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e => ...)
Внутри map(e => ...)
части я хотел бы получить доступ к имени столбца, в данном случае last_5_clicks
. Почему? Потому что у меня могут быть разные источники с разными именами столбцов (например , last_10min_page_view
), но я хотел бы повторно использовать код map(e => ...)
.
Есть ли способ сделать это? Спасибо.
Комментарии:
1. Какой в этом смысл ?? Я имею в виду, что на данном
map
этапе вы уже преобразовали данные в кортежи, чтобы иметь доступ к полям через их индексe._1
, поэтому вам действительно не нужно знать имя столбца, из которого они были созданы.
Ответ №1:
Начиная с Flink 1.12, к нему можно получить доступ через Table.getSchema.getFieldNames
. Начиная с версии 1.13, к нему можно получить доступ через Row.getFieldNames
.