#python #pytorch
Вопрос:
Я пытаюсь заставить Pytorch обучать записи одного файла паркета, не читая весь файл в памяти сразу, так как он не поместится в памяти. Поскольку файл хранится удаленно, я бы предпочел сохранить его как один файл, так как обучение с использованием ввода-вывода для многих файлов чрезвычайно дорого. Как я могу использовать Pytorch IterableDataset
или Dataset
читать меньшие фрагменты файла во время обучения, когда я хочу указать количество пакетов в файле DataLoader
? Я знаю, что Dataset
в этом случае стиль карты не будет работать, так как мне нужно все в одном файле, а не читать индекс каждого файла.
Мне удалось реализовать это в Tensorflow с помощью tfio.IODataset
и tf.data.Dataset
, но я не могу найти эквивалентного способа реализовать это в Pytorch.
Ответ №1:
Я нашел обходной путь , используя torch.utils.data.Dataset
, но данные должны быть обработаны с помощью dask заранее, так что каждый раздел является пользователем, хранится в виде собственного файла parquet, но может быть прочитан только один раз позже. В следующем коде метки и данные хранятся отдельно для задачи классификации многомерных временных рядов (но могут быть легко адаптированы и для других задач).:
import dask.dataframe as dd
import pandas as pd
import numpy as np
import torch
from torch.utils.data import TensorDataset, DataLoader, IterableDataset, Dataset
# Breakdown file
raw_ddf = dd.read_parquet("data.parquet") # Read huge file using dask
raw_ddf = raw_ddf.set_index("userid") # set the userid as index
userids = raw_ddf.index.unique().compute().values.tolist() # get a list of indices
new_ddf = raw_ddf.repartition(divisions = userids) # repartition by userids
new_ddf.to_parquet("my_folder") # this will save each user as its own parquet file within "my_folder"
# Dask to read the partitions
train_ddf = dd.read_parquet("my_folder/*.parquet") # read all files
# Read labels file
labels_df = pd.read("label.csv")
y_labels = np.array(labels_df["class"])
# Define the Dataset class
class UsersDataset(Dataset):
def __init__(self, dask_df, labels):
self.dask_df = dask_df
self.labels = labels
def __len__(self):
return len(self.labels)
def __getitem__(self, idx):
X_df = self.dask_df.get_partition(idx).compute()
X = np.row_stack([X_df])
X_tensor = torch.tensor(X, dtype=torch.float32)
y = self.labels[idx]
y_tensor = torch.tensor(y, dtype=torch.long)
sample = (X_tensor, y_tensor)
return sample
# Create a Dataset object
user_dataset = UsersDataset(dask_df=ddf_train, labels = y_train)
# Create a DataLoader object
dataloader = DataLoader(user_dataset, batch_size=4, shuffle=True, num_workers=0)
# Print output of the first batch to ensure it works
for i_batch, sample_batched in enumerate(dataloader):
print("Batch number ", i_batch)
print(sample_batched[0]) # print X
print(sample_batched[1]) # print y
# stop after first batch.
if i_batch == 0:
break
Я хотел бы знать, как я могу адаптировать свой подход при использовании >= 2 работников для чтения данных без повторяющихся записей. Любые идеи по этому поводу будут высоко оценены.