Как использовать tf.data.Dataset.from_generator() для одновременной загрузки только одного пакета из набора данных?

#python #tensorflow #keras #deep-learning

#python #tensorflow #keras #глубокое обучение

Вопрос:

Я хочу обучить CNN, и я пытаюсь загружать модель по одному пакету за раз, непосредственно из numpy memmap, без необходимости загружать весь набор данных в память, используя tf.data.Dataset.from_generator() . Я использую tf2.2 и графический процессор для подгонки. Набор данных представляет собой последовательность 3D-матриц (формат NCHW). Меткой каждого случая является следующая 3D-матрица. Проблема в том, что он по-прежнему загружает весь набор данных в память.

Вот краткий воспроизводимый пример:

 import numpy as np
from numpy.lib.format import open_memmap
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

tf.config.list_physical_devices("GPU")


# create and initialize the memmap
ds_shape = (20000, 3, 50, 50)
ds_mmap = open_memmap("ds.npy",
                      mode='w ',
                      dtype=np.dtype("float64"),
                      shape=ds_shape)
ds_mmap = np.random.rand(*ds_shape)

len_ds = len(ds_mmap)          # 20000
len_train = int(0.6 * len_ds)  # 12000
len_val = int(0.2 * len_ds)    # 4000
len_test = int(0.2 * len_ds)   # 4000
batch_size = 32
epochs = 50
  

Я попробовал 2 способа генерации наборов данных для тестирования на основе обучающих тестов (кроме того, если кто-нибудь может прокомментировать плюсы и минусы, это было бы более чем приветствуется)

1.

 def gen(ds_mmap, start, stop):
  for i in range(start, stop):
    yield (ds_mmap[i], ds_mmap[i   1])

tvt = {"train": None, "val": None, "test": None}
tvt_limits = {
  "train": (0, len_train),
  "val": (len_train, len_train   len_val),
  "test": (len_train   len_val, len_ds -1)  # -1 because the last case does not have a label
}

for ds_type, ds in tvt.items():
  start, stop = tvt_limits[ds_type]
  ds = tf.data.Dataset.from_generator(
    generator=gen,
    output_types=(tf.float64, tf.float64),
    output_shapes=(ds_shape[1:], ds_shape[1:]),
    args=[ds_mmap, start, stop]
  )

train_ds = (
  tvt["train"]
  .shuffle(len_ds, reshuffle_each_iteration=False)
  .batch(batch_size)
)
val_ds = tvt["val"].batch(batch_size)
test_ds = tvt["test"].batch(batch_size)
  
 def gen(ds_mmap):
  for i in range(len(ds_mmap) - 1):
    yield (ds_mmap[i], ds_mmap[i   1])

ds = tf.data.Dataset.from_generator(
  generator=gen,
  output_types=(tf.float64, tf.float64),
  output_shapes=(ds_shape[1:], ds_shape[1:])
  args=[ds_mmap]
)

train_ds = (
  ds
  .take(len_train)
  .shuffle(len_ds, reshuffle_each_iteration=False)
  .batch(batch_size)
)
val_ds = ds.skip(len_train).take(len_val).batch(batch_size)
test_ds = ds.skip(len_train   len_val).take(len_test - 1).batch(batch_size)
  

Оба способа работают, но переносят весь набор данных в память.

 model = keras.Sequential([
  layers.Conv2D(64, (3, 3), input_shape=ds_shape[1:],
                activation="relu", data_format="channels_first"),
  layers.MaxPooling2D(data_format="channels_first"),
  layers.Conv2D(128, (3, 3),
                activation="relu", data_format="channels_first"),
  layers.MaxPooling2D(data_format="channels_first"),
  layers.Flatten(),
  layers.Dense(8182, activation="relu"),
  layers.Dense(np.prod(ds_shape[1:])),
  layers.Reshape(ds_shape[1:])
])

model.compile(loss="mean_aboslute_error",
              optimizer="adam",
              metrics=[tf.keras.metrics.MeanSquaredError()])

hist = model.fit(
  train_ds,
  validation_data=val_ds,
  epochs=epochs,
  # steps_per_epoch=len_train // batch_size,
  # validation_steps=len_val // batch_size,
  shuffle=True
)
  

Ответ №1:

Альтернативой было создание подкласса keras.utils.Последовательность. Идея состоит в том, чтобы сгенерировать весь пакет.

Цитирование документов:

Последовательность — более безопасный способ многопроцессорной обработки. Эта структура гарантирует, что сеть будет обучаться только один раз для каждой выборки за эпоху, чего нельзя сказать о генераторах.

Для этого необходимо предоставить __len__() __getitem__() методы и .

Для текущего примера:

 class DS(keras.utils.Sequence):
  
  def __init__(self, ds_mmap, start, stop, batch_size):
    self.ds = ds_mmap[start: stop]
    self.batch_size = batch_size

  def __len__(self):
    # divide-ceil
    return -(-len(self.ds) // self.batch_size)

  def __getitem__(self, idx):
    start = idx * self.batch_size
    stop = (idx   1) * self.batch_size
    batch_y = self.ds[start   1: stop   1]
    batch_x = self.ds[start: stop][: len(batch_y)]
    return batch_x, batch_y
  
 for ds_type, ds in tvt.items():
  start, stop = tvt_limits[ds_type]
  ds = DS(ds_mmap, start, stop, batch_size)
  

В этом случае необходимо явно определить количество шагов и НЕ передавать batch_size :

 hist = model.fit(
  tvt["train"],
  validation_data=tvt["val"],
  epochs=epochs,
  steps_per_epoch=len_train // batch_size,
  validation_steps=len_val // batch_size,
  shuffle=True
)
  

Тем не менее, я не приступил from_generator() к работе, и я хотел бы знать, как это сделать.