проблема времени библиотеки spark_tensorflow_distributor в Python

#python #tensorflow #apache-spark

Вопрос:

У меня есть кластер искр с 3 рабочими и 1 мастером. Проблема заключается во времени выполнения, затрачиваемом сценарием. Первый (модифицированный из официального github spark_tensorflow_distributor) запускает приведенный ниже код на рабочем узле (без spark), и результат: Время: 111.03140830993652 секунды.

 import time
import tensorflow_datasets as tfds
import tensorflow as tf

BUFFER_SIZE = 10000
BATCH_SIZE = 64

def make_datasets_unbatched():
    # Scaling MNIST data from (0, 255] to (0., 1.]
    def scale(image, label):
        image = tf.cast(image, tf.float32)
        image /= 255
        return image, label
    datasets, info = tfds.load(
            name='mnist',
            with_info=True,
            as_supervised=True,
        )
    return datasets['train'].map(scale).cache().shuffle(BUFFER_SIZE)

def build_and_compile_cnn_model():
    model = tf.keras.Sequential([
            tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
            tf.keras.layers.MaxPooling2D(),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(10, activation='softmax'),
    ])
    model.compile(
            loss=tf.keras.losses.sparse_categorical_crossentropy,
            optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
            metrics=['accuracy'],
    )
    return model
    
if __name__ == "__main__":
    GLOBAL_BATCH_SIZE = 64 * 3
    train_datasets = make_datasets_unbatched().batch(GLOBAL_BATCH_SIZE).repeat()
    
    model = build_and_compile_cnn_model()
    t = time.time()
    h = model.fit(x=train_datasets, epochs=800, steps_per_epoch=10, verbose=0)
    print("Time: ", time.time() - t, "seconds")
 

Но когда я запускаю приведенный ниже код (взятый из их официального github), используя num_slots=3 (число рабочих узлов), вывод: Распределенное время: 252.17275404930115 секунд.

 import time
from pyspark import SparkConf, SparkContext
from spark_tensorflow_distributor import MirroredStrategyRunner


# Taken from https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras
def train():
    import tensorflow_datasets as tfds
    import tensorflow as tf
    import time
    BUFFER_SIZE = 10000
    BATCH_SIZE = 64

    def make_datasets_unbatched():
        # Scaling MNIST data from (0, 255] to (0., 1.]
        def scale(image, label):
            image = tf.cast(image, tf.float32)
            image /= 255
            return image, label
        datasets, info = tfds.load(
            name='mnist',
            with_info=True,
            as_supervised=True,
        )
        return datasets['train'].map(scale).cache().shuffle(BUFFER_SIZE)

    def build_and_compile_cnn_model():
        model = tf.keras.Sequential([
            tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
            tf.keras.layers.MaxPooling2D(),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(10, activation='softmax'),
        ])
        model.compile(
            loss=tf.keras.losses.sparse_categorical_crossentropy,
            optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
            metrics=['accuracy'],
        )
        return model

    GLOBAL_BATCH_SIZE = 64 * 3
    train_datasets = make_datasets_unbatched().batch(GLOBAL_BATCH_SIZE).repeat()
    options = tf.data.Options()
    options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
    train_datasets = train_datasets.with_options(options)
    multi_worker_model = build_and_compile_cnn_model()
    multi_worker_model.fit(x=train_datasets, epochs=800, steps_per_epoch=10)
    return tf.config.experimental.list_physical_devices('GPU')

conf = SparkConf().setMaster("spark://192.168.1.200:7077").setAppName("mnist")
sc = SparkContext(conf=conf)

t = time.time()
runner = MirroredStrategyRunner(num_slots=n, local_mode=False, use_gpu=False, use_custom_strategy=False)
runner.run(train)
print("Distributed Time: ", time.time() - t, "seconds")
 

Я проверил в spark WebUI, что все три рабочих узла заняты, поэтому я не понимаю, почему распределенный способ занимает больше времени. Если бы кто-нибудь мог помочь мне найти решение, я был бы очень благодарен. Большое спасибо.