как я могу преобразовать фрейм данных big pandas в фрейм потоковых данных?

#python #pandas #dataframe #stream

#python #pandas #фрейм данных #поток

Вопрос:

введите описание изображения здесьУ меня есть фрейм данных pandas, который включает временные метки, идентификатор, продукты, цену и содержит более 50 столбцов.

Я хотел бы преобразовать этот фрейм данных в фрейм потоковых данных. Например, каждые 10 секунд я хотел бы получать 10 raw или 1 raw, а затем после следующих 10 raw или 1 raw, пока фрейм данных не закончится.

Я заглянул в библиотеку streamz, но не смог найти подходящую функцию для этого.

Таким образом, я планирую применить некоторую визуализацию и выполнить некоторые функциональные агрегации или дальнейший анализ.

 >>>df.head()
  

Комментарии:

1. если вы попробуете это «df = pd.read_csv (‘file.csv’, iterator = True, chunksize = 2)», это вернет итератор, и вы можете перебирать df для выполнения некоторых действий. Приведенный выше оператор вернет 2 строки на каждой итерации

2. как я могу добавить период к этому, например, 10 секунд? ищете более библиотечное решение для этого вместо создания всех библиотек с нуля

3. вы не получите никаких готовых решений. Если вы хотите добавить время, вы можете использовать библиотеку расписания ( github.com/dbader/schedule ) чтобы запланировать такую работу или time.sleep также будет работать, это будет полностью зависеть от вашего сквозного рабочего процесса.

4. Единственное, что я пытаюсь решить, это позволить фрейму данных действовать как потоковое. Если никто не делал этого раньше, я обязательно создам для этого библиотеку с открытым исходным кодом. Но я видел хорошие примеры в библиотеке streamz

Ответ №1:

Публикую это небольшое решение вашего вопроса.

 import pandas as pd
import schedule

df = pd.read_csv('file.csv', iterator=True, chunksize=2)

def get_next_row():
    row = next(df)
    print(row)
    # do_some_thing_with_row(row)

schedule.every(5).seconds.do(get_next_row)

while True:
    try:
        schedule.run_pending()
    except StopIteration as e:
        print("EOF")
        break
  

Приведенный выше код в основном вызывает функцию get_next_row и считывает каждые 2 строки с интервалом в 5 секунд и печатает строки. Вместо печати вы можете добавить свою функциональность. Как только он достигнет EOF, он выдаст исключение StopIteraton.

Теперь вы можете изменять интервал и размер фрагмента в соответствии с вашими требованиями.

Ответ №2:

Ранее я обходил аналогичную проблему, используя pd.date_range() для создания времени с нужным интервалом, а затем разрезая исходный фрейм данных на времена в диапазоне.

Например.

 times = pd.date_range(start=13:00, end=15:00, freq=T)
for t in times:
    df_instance = df[df["Time"]<t]
    Do something