#apache-flink #flink-streaming
#apache-flink #flink-потоковая передача
Вопрос:
Скажем, у меня есть таблица flink CREATE TABLE source(id int, name string) with (...)
и, скажем, таблица назначения CREATE TABLE destination(id int, unique_name string) with (...)
. unique_name
вычисляется с использованием бизнес-логики во внутренней функции процесса flink.
Таким образом, мы можем с уверенностью предположить, что исходная схема будет точной (имя и типы данных) такой же, как и схема назначения. Я сделал некоторый низкий уровень process
, используя API потока данных, чтобы получить destination
поток данных. Она имеет outputType
вид GenericType<org.apache.flink.types.Row>
. Когда я снова конвертирую обратный destination
поток данных в таблицу, я получил приведенную ниже ошибку.
org.apache.flink.table.api.ValidationException: Column types of query result and sink
for registered table 'default_catalog.default_database.destination' do not match.
Cause: Different number of columns.
Query schema: [f0: RAW('org.apache.flink.types.Row', '...')]
Sink schema: [id: INT, name: STRING]
Хотя я могу решить эту проблему, используя приведенный ниже код, однако я хочу обобщить это и получить RowTypeInformation
из пункта назначения Table
. Есть ли какой-либо способ получить TypeInformation
от flink Table
.
tableEnv.fromDataStream(destionationDataStream.map(x -> x).returns(Types.ROW(Types.Int, Types.String))
Ответ №1:
Система типов таблиц богаче, чем TypeInformation
. Если вы согласны с использованием внутренних классов, вы можете использовать org.apache.flink.table.runtime.typeutils.ExternalTypeInfo
. Это TypeInformation
то, что можно настроить с помощью Table API DataType
.
Если вы хотите использовать официально поддерживаемый API. Вы можете объявлять тип ввода и вывода с TypeInformation
помощью и использовать DataTypes.of(TypeInformation)
при вызове StreamTableEnvironment.toDataStream(..., DataType)
Комментарии:
1. Я не вызываю toDataStream, не в состоянии четко понять, ниже приведен мой фрагмент кода, можете ли вы помочь. paste.org/120818
2.@timo, я использовал externaltypeinfo, с
ROW_NAMED
, но все равно происходит сбой кода вRowRowConverter
toInternal
методе класса. Это проблема упорядочения столбцов.Однако я добавил TypeInformation с именами полей, но все равно он становится нулевым.tableEnv.fromDataStream(processedDataStream.map(x => x).returns( Types.ROW_NAMED(destinationColumnName, destinationTypeInformation: _*) ))
3. Ошибка: вызвана: org.apache. flink.util. Исключение FlinkRuntimeException: ошибка при преобразовании входных данных из внешнего API потока данных во внутренние структуры данных Table API. Убедитесь, что предоставленные типы данных, которые настраивают преобразователи, правильно объявлены в схеме. Затронутая запись