Повторяющийся набор данных pytorch с несколькими рабочими

#python #deep-learning #pytorch #distributed

Вопрос:

Итак, у меня есть текстовый файл больше, чем моя оперативная память, я хотел бы создать набор данных в PyTorch, который читает построчно, поэтому мне не нужно загружать все это сразу в память. Я нашел pytorch IterableDataset в качестве потенциального решения моей проблемы. Он работает так, как ожидалось, только при использовании 1 рабочего, при использовании более одного рабочего он создаст повторяющиеся записи. Позвольте мне показать вам пример:

Наличие testfile.txt содержащего:

 0 - Dummy line
1 - Dummy line
2 - Dummy line
3 - Dummy line
4 - Dummy line
5 - Dummy line
6 - Dummy line
7 - Dummy line
8 - Dummy line
9 - Dummy line
 

Определение IterableDataset:

 class CustomIterableDatasetv1(IterableDataset):

    def __init__(self, filename):

        #Store the filename in object's memory
        self.filename = filename

    def preprocess(self, text):

        ### Do something with text here
        text_pp = text.lower().strip()
        ###

        return text_pp

    def line_mapper(self, line):
        
        #Splits the line into text and label and applies preprocessing to the text
        text, label = line.split('-')
        text = self.preprocess(text)

        return text, label


    def __iter__(self):

        #Create an iterator
        file_itr = open(self.filename)

        #Map each element using the line_mapper
        mapped_itr = map(self.line_mapper, file_itr)
        
        return mapped_itr
 

Теперь мы можем его протестировать:

 base_dataset = CustomIterableDatasetv1("testfile.txt")
#Wrap it around a dataloader
dataloader = DataLoader(base_dataset, batch_size = 1, num_workers = 1)
for X, y in dataloader:
    print(X,y)
 

Он выводит:

 

('0',) (' Dummy linen',)
('1',) (' Dummy linen',)
('2',) (' Dummy linen',)
('3',) (' Dummy linen',)
('4',) (' Dummy linen',)
('5',) (' Dummy linen',)
('6',) (' Dummy linen',)
('7',) (' Dummy linen',)
('8',) (' Dummy linen',)
('9',) (' Dummy line',)
 

Это правильно. Но если я изменю количество рабочих на 2, результат станет

 ('0',) (' Dummy linen',)
('0',) (' Dummy linen',)
('1',) (' Dummy linen',)
('1',) (' Dummy linen',)
('2',) (' Dummy linen',)
('2',) (' Dummy linen',)
('3',) (' Dummy linen',)
('3',) (' Dummy linen',)
('4',) (' Dummy linen',)
('4',) (' Dummy linen',)
('5',) (' Dummy linen',)
('5',) (' Dummy linen',)
('6',) (' Dummy linen',)
('6',) (' Dummy linen',)
('7',) (' Dummy linen',)
('7',) (' Dummy linen',)
('8',) (' Dummy linen',)
('8',) (' Dummy linen',)
('9',) (' Dummy line',)
('9',) (' Dummy line',)
 

Что неверно, так как создает дубликаты каждого образца для каждого рабочего в загрузчике данных.

Есть ли способ решить эту проблему с помощью pytorch? Таким образом, можно создать загрузчик данных, чтобы не загружать весь файл в память с поддержкой нескольких рабочих.

Ответ №1:

У вас есть доступ к идентификатору рабочего внутри функции Dataset ‘s __iter__ с помощью torch.utils.data.get_worker_info утилиты. Это означает, что вы можете выполнить итератор и добавить смещение в зависимости от идентификатора рабочего. Вы можете обернуть итератор, с itertools.islice помощью которого вы можете изменять start индекс, а также step .

Вот минимальный пример:

 class DS(IterableDataset):
    def __init__(self, batch_size):
        super().__init__()
        self.batch_size = batch_size

    def __iter__(self):
        uid = torch.utils.data.get_worker_info().id
        itr = islice(range(10), uid, None, self.batch_size)
 

Циклический просмотр загрузчика данных приведет к появлению уникальных экземпляров, даже если мы используем num_workers > 1 :

 >>> for x in DataLoader(DS(batch_size=2), batch_size=2, num_workers=2):
...     print(x)
tensor([0, 2])
tensor([1, 3])
tensor([4, 6])
tensor([5, 7])
tensor([8])
tensor([9])
 

В вашем случае вы можете сделать:

     def __iter__(self):
        # create an iterator
        file_itr = open(self.filename)

        # map each element using the line_mapper
        mapped_itr = map(self.line_mapper, file_itr)
    
        # wrap the iterator
        step_itr = islice(mapped_itr, uid, None, self.batch_size)

        return step_itr
 

Комментарии:

1. Как вы могли бы добавить эту логику к чтению файла? Я понимаю, как вы это делаете со списком, который вы уже определили, но как насчет чтения файла на ходу?

2. Я действительно не понимаю, разве ваш итерируемый набор данных еще не реализован и не работает num_workers=1 ?

3. Да, но вы индексируете на основе во uid время создания итератора, а не после создания итератора. Вот почему я не понимаю, как поместить это поверх моего текущего IterableDataset

4. __iter__ Функция вызывается загрузчиком данных для каждого рабочего при первом циклическом запуске загрузчика данных. Вы пробовали возврат iter(list(mapped_itr)[uid::self.batch_size]) в своей __iter__ функции?

5. Также требуется одно изменение, вместо self.batch_size при нарезке должно быть число рабочих.

Ответ №2:

Итак, я нашел ответ на форуме обсуждения torch https://discuss.pytorch.org/t/iterable-pytorch-dataset-with-multiple-workers/135475/3 где они указали, что я должен использовать информацию о работнике для последовательного нарезания в соответствии с размером пакета.

Новый набор данных будет выглядеть следующим образом:

 class CustomIterableDatasetv1(IterableDataset):

    def __init__(self, filename):

        #Store the filename in object's memory
        self.filename = filename

    def preprocess(self, text):

        ### Do something with text here
        text_pp = text.lower().strip()
        ###

        return text_pp

    def line_mapper(self, line):
        
        #Splits the line into text and label and applies preprocessing to the text
        text, label = line.split('-')
        text = self.preprocess(text)

        return text, label


    def __iter__(self):
        worker_total_num = torch.utils.data.get_worker_info().num_workers
        worker_id = torch.utils.data.get_worker_info().id
        #Create an iterator
        file_itr = open(self.filename)

        #Map each element using the line_mapper
        mapped_itr = map(self.line_mapper, file_itr)
        
        #Add multiworker functionality
        mapped_itr = itertools.islice(mapped_itr, worker_id, None, worker_total_num)

        return mapped_itr
 

Особая благодарность @Ivan, который также указал на решение для нарезки.

С двумя рабочими он возвращает те же данные, что и только с 1 рабочим

Комментарии:

1. Предпочтительнее было бы отредактировать… В любом случае, пожалуйста, рассмотрите возможность повышения исходного ответа.