Столбцы для структурирования паркета с помощью pyarrow и (или) pandas в Python

#python #pandas #struct #parquet #pyarrow

#питон #pandas #структура #паркет #pyarrow

Вопрос:

Я надеюсь, что некоторые из вас найдут немного времени, чтобы помочь такому новичку, как я. Я работаю над этой задачей целую неделю и не могу найти решение. Я понимаю и полностью согласен с тем, что мне нужно изучить каждый пакет, который я использую, и их комбинации, чтобы найти правильное решение.

Полная задача состоит в том, чтобы объединить 5 столбцов (1000 строк) в 1 столбец структуры и сохранить / преобразовать его в строку (1000 столбцов) в parquet. Но я столкнулся с проблемой объединения 5 столбцов в 1 столбец структуры.

Первоначально я получаю следующие столбцы: columns=[‘date’, ‘bidopen’, ‘bidclose’, ‘bidhigh’, ‘bidlow’, ‘tickqty’]. Мне не нужно, чтобы «дата» была частью структуры.

Что я пробовал:

 import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
 

Вариант 1 — словарь с pandas

 df = pd.read_csv('original.csv')
df2 = df.drop(columns=['date'])
df3 = df2.to_dict('records')
 

Я не могу сохранить dict в csv или parquet с помощью pandas — следующие 2 команды приводят к обратному преобразованию в фрейм данных pandas и сохранению столбцов отдельно.

 pd.DataFrame(df3).to_csv('test_parquet.csv', index=False)
pd.DataFrame(df3).to_parquet('test2.parquet')
 

Если бы я мог использовать dictionary в качестве фрейма данных, затем я бы использовал pandas.DataFrame.pivot для превращения строк в столбцы. Затем я попытался преобразовать dict в таблицу pyarrow (похоже, потенциально я мог бы также сохранять записи в столбцах (1 строка)).

 table = pa.Table.from_pydict({'data', pa.array(df3)})
 

После строки выше у меня ошибка, и я не смог найти решение (TypeError: unhashable type: ‘pyarrow.lib.StructArray’). Следующим шагом будет сохранение таблицы в parquet с помощью pyarrow.

Вариант 2 — структурировать с помощью pyarrow

Здесь я попытался поработать внутри parquet, чтобы изменить схему (или записать в новую схему)

 df = pd.read_csv('original.csv')
df = df.drop(columns=['date'])
df.to_parquet('test.parquet')
table = pq.read_table('test.parquet', columns=['bidopen', 'bidclose', 'bidhigh', 'bidlow', 'tickqty'])
 

Здесь я читаю схему паркета, чтобы увидеть тип данных каждого столбца. Ниже я устанавливаю новую схему:

 struct = pa.struct([
    pa.field('bidopen', pa.float64()),
    pa.field('bidclose', pa.float64()),
    pa.field('bidhigh', pa.float64()),
    pa.field('bidlow', pa.float64()),
    pa.field('tickqty', pa.int64())
])
fields = ([pa.field('data', pa.list_(struct))])
schema = pa.schema(fields)
writer = pq.ParquetWriter('test2.parquet', schema)
writer.write_table(table)
writer.close()
 

Я получил ошибку, для которой я также не смог найти решение (ValueError: схема таблицы не соответствует схеме, используемой для создания file: …), Поскольку я думал, что она сохранится в новой предоставленной схеме.

Вариант 3 — приведение pyarrow

 #(the upper part is from the Option 2)
...
schema = pa.schema(fields)
table2 = table.cast(schema)
writer = pq.ParquetWriter('test2.parquet', schema)
writer.write_table(table2)
writer.close()
 

Я получил еще одну ошибку (ValueError: имена полей целевой схемы не совпадают с именами полей таблицы:). Здесь я сказал — да ладно, я выполняю приведение именно потому, что схемы не совпадают… Это не помогло.

Вариант 4 — еще одна попытка изменить схему при загрузке из pandas в pyarrow, чтобы сохранить ее позже в parquet

 arrays = [['data','data','data','data','data'],['bidopen', 'bidclose','bidhigh','bidlow','tickqty']]
tuples = list(zip(*arrays))
index = pd.MultiIndex.from_tuples(tuples)
df2 = pd.DataFrame(df.values[:, 1:], columns=index)
pa.Schema.from_pandas(df2)
 

Здесь я получил ошибку (AttributeError: объект ‘list’ не имеет атрибута ‘columns’), для которой я также не смог найти решение.

Вариант 5 — pyspark

Это было самое большое упущение для меня, так как я потратил около 3 дней, чтобы «выучить» его так, как он должен уметь выполнять преобразование в структуру и поворот. НО позже я узнал, что я не могу сохранять данные в parquet с помощью pyspark на моем Win10 без дополнительных пакетов: Hadoop и Java SDK (это не бесплатно для использования). Поэтому я прекратил его дальнейшее развитие.

Комментарии:

1. Я не совсем уверен, что понимаю, что вам нужно. Вы начинаете с 5 столбцов и 5000 строк. В конце вам нужна одна строка с 1000 столбцами? Является ли каждый элемент в этом struct? Я думаю, что было бы более распространенным создать один столбец с 1000 строками, где каждый элемент является структурой. Может быть, это то, что вы хотите сделать? Если вы собираетесь создать 1000 столбцов, какими будут имена этих столбцов?

2. привет, темп. У меня есть 5 столбцов с 1000 строками (не 5000). Вы правы, и получение только 1 столбца структуры из 5 — это промежуточный шаг, на котором я остановился. Не могли бы вы помочь с этим? После того, как у меня будет 1 пораженный столбец, я собираюсь превратить его в строку. Именование новых столбцов будет зависеть от способа получения столбца struct. Я мог бы просто переименовать или добавить новые имена в новые столбцы. Или я мог бы добавить еще один столбец с некоторым шаблоном в полях (например, от data_piece_1 до _1000), содержащий новый столбец в качестве имен столбцов и столбец данных в качестве 1-й строки после поворота. Надеюсь, мое мышление понятно. Спасибо.

Ответ №1:

Для первой части вашего вопроса вы можете сделать это (обратите внимание, StructArray.from_arrays ожидает массивы, поэтому вам нужно сгладить фрагментированные массивы):

 fields, arrs = [], []
for column_index in range(table.num_columns):
    fields.append(table.field(column_index))
    arrs.append(table.column(column_index).flatten()[0].chunks[0])
struct_array = pa.StructArray.from_arrays(arrs, fields=fields)
print(struct_array)
print(struct_array.to_pylist())
 

Пример вывода:

 -- is_valid: all not null
-- child 0 type: double
  [
    1.1,
    2.2
  ]
-- child 1 type: double
  [
    3.3,
    4.4
  ]
-- child 2 type: double
  [
    5.5,
    6.6
  ]
-- child 3 type: double
  [
    7.7,
    8.8
  ]
-- child 4 type: int64
  [
    9,
    10
  ]
[{'bidopen': 1.1, 'bidclose': 3.3, 'bidhigh': 5.5, 'bidlow': 7.7, 'tickqty': 9}, {'bidopen': 2.2, 'bidclose': 4.4, 'bidhigh': 6.6, 'bidlow': 8.8, 'tickqty': 10}]
 

Я не думаю, что pyarrow может транспонировать, если это то, что вы просите для второй части вашего вопроса. Вы могли бы использовать pandas для транспонирования, но это была бы другая копия.

 df = pa.Table.from_arrays([struct_array], ['data']).to_pandas()
print(df.transpose())
 

Пример вывода:

                                                       0  
data  {'bidopen': 1.1, 'bidclose': 3.3, 'bidhigh': 5...   

                                                      1  
data  {'bidopen': 2.2, 'bidclose': 4.4, 'bidhigh': 6...  

​
 

Результатом в этом случае всегда будет таблица с одной строкой с N столбцами, и каждая ячейка будет структурой.

Комментарии:

1. Пейс, большое спасибо за вашу помощь. Я принял ответ, поскольку он почти полностью решает мой вопрос. Единственная небольшая проблема, с которой я столкнулся, заключается в том, что данные не могут быть сохранены в parquet, поскольку номера не могут быть в виде имен столбцов. Я преобразовал их в строку и столкнулся с другой проблемой — я не смог открыть успешно сохраненный паркет, поскольку «0» (ноль в виде строки) не может быть именем 1-го столбца. Я нашел обходной путь — перед отправкой ответа я добавил еще один столбец «column» с полями «column1», «column2» и т. Д. И Установил его в качестве индекса с помощью df.set_index(‘column’).