Преобразование динамической таблицы Flink в фрейм данных Pandas

#pandas #apache-kafka #apache-flink #pyflink #flink-table-api

#pandas #apache-kafka #apache-flink #pyflink #flink-table-api

Вопрос:

Я использую pyflink table api для чтения данных из Kafka. Теперь я хочу преобразовать результирующую таблицу в фрейм данных Pandas. Вот мой код,

 exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = StreamTableEnvironment 
    .create(exec_env, environment_settings=EnvironmentSettings
            .new_instance()
            .in_streaming_mode()
            .use_blink_planner().build())

INPUT_TABLE = "source"

t_env 
    .connect(  # declare the external system to connect to
    Kafka()
        .version("universal")
        .topic("Rides")
        .start_from_earliest()
        .property("zookeeper.connect", "zookeeper:2181")
        .property("bootstrap.servers", "kafka:9092")) 
    .with_format(  # declare a format for this system
    Json()
        .fail_on_missing_field(True)
        .schema(DataTypes.ROW([
        DataTypes.FIELD("rideId", DataTypes.BIGINT()),
        DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
        DataTypes.FIELD("eventTime", DataTypes.STRING()),
        DataTypes.FIELD("lon", DataTypes.FLOAT()),
        DataTypes.FIELD("lat", DataTypes.FLOAT()),
        DataTypes.FIELD("psgCnt", DataTypes.INT()),
        DataTypes.FIELD("taxiId", DataTypes.BIGINT())]))) 
    .with_schema(  # declare the schema of the table
    Schema()
        .field("rideId", DataTypes.BIGINT())
        .field("taxiId", DataTypes.BIGINT())
        .field("isStart", DataTypes.BOOLEAN())
        .field("lon", DataTypes.FLOAT())
        .field("lat", DataTypes.FLOAT())
        .field("psgCnt", DataTypes.INT())
        .field("eventTime", DataTypes.STRING())) 
    .in_append_mode() 
    .create_temporary_table(INPUT_TABLE)

table = t_env.from_path(INPUT_TABLE)
df = table.to_pandas()
 

Но здесь я не получаю ни ошибки, ни результата. Я использую Flink 1.11.3. Есть ли способ преобразовать эту динамическую таблицу в статическую таблицу или что-то еще, чтобы заставить table.to_pandas() работать?

Ответ №1:

Мы не можем вызвать to_pandas неограниченную таблицу потоков. to_pandas может вызываться только в ограниченной таблице.