#pandas #apache-kafka #apache-flink #pyflink #flink-table-api
#pandas #apache-kafka #apache-flink #pyflink #flink-table-api
Вопрос:
Я использую pyflink table api для чтения данных из Kafka. Теперь я хочу преобразовать результирующую таблицу в фрейм данных Pandas. Вот мой код,
exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = StreamTableEnvironment
.create(exec_env, environment_settings=EnvironmentSettings
.new_instance()
.in_streaming_mode()
.use_blink_planner().build())
INPUT_TABLE = "source"
t_env
.connect( # declare the external system to connect to
Kafka()
.version("universal")
.topic("Rides")
.start_from_earliest()
.property("zookeeper.connect", "zookeeper:2181")
.property("bootstrap.servers", "kafka:9092"))
.with_format( # declare a format for this system
Json()
.fail_on_missing_field(True)
.schema(DataTypes.ROW([
DataTypes.FIELD("rideId", DataTypes.BIGINT()),
DataTypes.FIELD("isStart", DataTypes.BOOLEAN()),
DataTypes.FIELD("eventTime", DataTypes.STRING()),
DataTypes.FIELD("lon", DataTypes.FLOAT()),
DataTypes.FIELD("lat", DataTypes.FLOAT()),
DataTypes.FIELD("psgCnt", DataTypes.INT()),
DataTypes.FIELD("taxiId", DataTypes.BIGINT())])))
.with_schema( # declare the schema of the table
Schema()
.field("rideId", DataTypes.BIGINT())
.field("taxiId", DataTypes.BIGINT())
.field("isStart", DataTypes.BOOLEAN())
.field("lon", DataTypes.FLOAT())
.field("lat", DataTypes.FLOAT())
.field("psgCnt", DataTypes.INT())
.field("eventTime", DataTypes.STRING()))
.in_append_mode()
.create_temporary_table(INPUT_TABLE)
table = t_env.from_path(INPUT_TABLE)
df = table.to_pandas()
Но здесь я не получаю ни ошибки, ни результата. Я использую Flink 1.11.3. Есть ли способ преобразовать эту динамическую таблицу в статическую таблицу или что-то еще, чтобы заставить table.to_pandas() работать?
Ответ №1:
Мы не можем вызвать to_pandas
неограниченную таблицу потоков. to_pandas
может вызываться только в ограниченной таблице.