Flink Table API: ГРУППИРОВКА ПО при выполнении SQL вызывает org.apache.исключение flink.table.api.TableException

#apache-kafka #apache-flink #flink-sql #flink-table-api

#apache-кафка #apache-flink #flink-sql #flink-table-api

Вопрос:

У меня есть этот очень упрощенный вариант использования: я хочу использовать Apache Flink (1.11) для чтения данных из раздела Kafka (назовем его source_topic), подсчитать в нем атрибут (называемый b) и записать результат в другой раздел Kafka (result_topic ).

Пока у меня есть следующий код:

 from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, EnvironmentSettings

def log_processing():
    env = StreamExecutionEnvironment.get_execution_environment()
    env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
    t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=env_settings)`
    t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)
    t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.2.jar")

    source_ddl = """
            CREATE TABLE source_table(
                a STRING,
                b INT
            ) WITH (
              'connector' = 'kafka',
              'topic' = 'source_topic',
              'properties.bootstrap.servers' = 'node-1:9092',
              'scan.startup.mode' = 'earliest-offset',
              'format' = 'csv',
              'csv.ignore-parse-errors' = 'true'
            )
            """

    sink_ddl = """
            CREATE TABLE result_table(
                b INT,
                result BIGINT
            ) WITH (
              'connector' = 'kafka',
              'topic' = 'result_topic',
              'properties.bootstrap.servers' = 'node-1:9092',
              'format' = 'csv'
            )
            """

    t_env.execute_sql(source_ddl)
    t_env.execute_sql(sink_ddl)
    t_env.execute_sql("INSERT INTO result_table SELECT b,COUNT(b) FROM source_table GROUP BY b")
    t_env.execute("Kafka_Flink_job")

if __name__ == '__main__':
    log_processing()
  

Но когда я его выполняю, я получаю следующую ошибку:

 py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
: org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.result_table' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[b], select=[b, COUNT(b) AS EXPR$1])
  

Я могу записывать данные в тему Kafka с помощью простого SELECT оператора. Но как только я добавляю GROUP BY предложение, генерируется исключение, указанное выше. Я следил за документацией Flink об использовании Table API с SQL для Python: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#sql

Любая помощь приветствуется, я очень новичок в обработке потоков и Flink. Спасибо!

Ответ №1:

Использование GROUP BY предложения сгенерирует поток обновления, который не поддерживается соединителем Kafka начиная с Flink 1.11. С другой стороны, когда вы используете простой SELECT оператор без какой-либо агрегации, поток результатов доступен только для добавления (вот почему вы можете использовать его без проблем).

Flink 1.12 очень близок к выпуску и включает в себя новый соединитель upsert Kafka (FLIP-149, если вам интересно), который позволит вам выполнять операции такого типа также в PyFlink (т. Е. Python Table API).

Комментарии:

1. Большое спасибо за ваш ответ, morsapaes. Итак, я буду ждать Flink 1.12, который планируется выпустить в конце ноября, чтобы использовать приемник Kafka для обновленных данных.