Таблица Flink получает информацию о типе

#apache-flink #flink-streaming

#apache-flink #flink-потоковая передача

Вопрос:

Скажем, у меня есть таблица flink CREATE TABLE source(id int, name string) with (...) и, скажем, таблица назначения CREATE TABLE destination(id int, unique_name string) with (...) . unique_name вычисляется с использованием бизнес-логики во внутренней функции процесса flink.

Таким образом, мы можем с уверенностью предположить, что исходная схема будет точной (имя и типы данных) такой же, как и схема назначения. Я сделал некоторый низкий уровень process , используя API потока данных, чтобы получить destination поток данных. Она имеет outputType вид GenericType<org.apache.flink.types.Row> . Когда я снова конвертирую обратный destination поток данных в таблицу, я получил приведенную ниже ошибку.

 org.apache.flink.table.api.ValidationException: Column types of query result and sink 
for registered table 'default_catalog.default_database.destination' do not match.
Cause: Different number of columns.

Query schema: [f0: RAW('org.apache.flink.types.Row', '...')]
Sink schema:  [id: INT, name: STRING]
 

Хотя я могу решить эту проблему, используя приведенный ниже код, однако я хочу обобщить это и получить RowTypeInformation из пункта назначения Table . Есть ли какой-либо способ получить TypeInformation от flink Table .

 tableEnv.fromDataStream(destionationDataStream.map(x -> x).returns(Types.ROW(Types.Int, Types.String))
 

Ответ №1:

Система типов таблиц богаче, чем TypeInformation . Если вы согласны с использованием внутренних классов, вы можете использовать org.apache.flink.table.runtime.typeutils.ExternalTypeInfo . Это TypeInformation то, что можно настроить с помощью Table API DataType .

Если вы хотите использовать официально поддерживаемый API. Вы можете объявлять тип ввода и вывода с TypeInformation помощью и использовать DataTypes.of(TypeInformation) при вызове StreamTableEnvironment.toDataStream(..., DataType)

Комментарии:

1. Я не вызываю toDataStream, не в состоянии четко понять, ниже приведен мой фрагмент кода, можете ли вы помочь. paste.org/120818

2.@timo, я использовал externaltypeinfo, с ROW_NAMED , но все равно происходит сбой кода в RowRowConverter toInternal методе класса. Это проблема упорядочения столбцов.Однако я добавил TypeInformation с именами полей, но все равно он становится нулевым. tableEnv.fromDataStream(processedDataStream.map(x => x).returns( Types.ROW_NAMED(destinationColumnName, destinationTypeInformation: _*) ))

3. Ошибка: вызвана: org.apache. flink.util. Исключение FlinkRuntimeException: ошибка при преобразовании входных данных из внешнего API потока данных во внутренние структуры данных Table API. Убедитесь, что предоставленные типы данных, которые настраивают преобразователи, правильно объявлены в схеме. Затронутая запись