#python #tensorflow #apache-spark
Вопрос:
У меня есть кластер искр с 3 рабочими и 1 мастером. Проблема заключается во времени выполнения, затрачиваемом сценарием. Первый (модифицированный из официального github spark_tensorflow_distributor) запускает приведенный ниже код на рабочем узле (без spark), и результат: Время: 111.03140830993652 секунды.
import time
import tensorflow_datasets as tfds
import tensorflow as tf
BUFFER_SIZE = 10000
BATCH_SIZE = 64
def make_datasets_unbatched():
# Scaling MNIST data from (0, 255] to (0., 1.]
def scale(image, label):
image = tf.cast(image, tf.float32)
image /= 255
return image, label
datasets, info = tfds.load(
name='mnist',
with_info=True,
as_supervised=True,
)
return datasets['train'].map(scale).cache().shuffle(BUFFER_SIZE)
def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax'),
])
model.compile(
loss=tf.keras.losses.sparse_categorical_crossentropy,
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'],
)
return model
if __name__ == "__main__":
GLOBAL_BATCH_SIZE = 64 * 3
train_datasets = make_datasets_unbatched().batch(GLOBAL_BATCH_SIZE).repeat()
model = build_and_compile_cnn_model()
t = time.time()
h = model.fit(x=train_datasets, epochs=800, steps_per_epoch=10, verbose=0)
print("Time: ", time.time() - t, "seconds")
Но когда я запускаю приведенный ниже код (взятый из их официального github), используя num_slots=3 (число рабочих узлов), вывод: Распределенное время: 252.17275404930115 секунд.
import time
from pyspark import SparkConf, SparkContext
from spark_tensorflow_distributor import MirroredStrategyRunner
# Taken from https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras
def train():
import tensorflow_datasets as tfds
import tensorflow as tf
import time
BUFFER_SIZE = 10000
BATCH_SIZE = 64
def make_datasets_unbatched():
# Scaling MNIST data from (0, 255] to (0., 1.]
def scale(image, label):
image = tf.cast(image, tf.float32)
image /= 255
return image, label
datasets, info = tfds.load(
name='mnist',
with_info=True,
as_supervised=True,
)
return datasets['train'].map(scale).cache().shuffle(BUFFER_SIZE)
def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax'),
])
model.compile(
loss=tf.keras.losses.sparse_categorical_crossentropy,
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'],
)
return model
GLOBAL_BATCH_SIZE = 64 * 3
train_datasets = make_datasets_unbatched().batch(GLOBAL_BATCH_SIZE).repeat()
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
train_datasets = train_datasets.with_options(options)
multi_worker_model = build_and_compile_cnn_model()
multi_worker_model.fit(x=train_datasets, epochs=800, steps_per_epoch=10)
return tf.config.experimental.list_physical_devices('GPU')
conf = SparkConf().setMaster("spark://192.168.1.200:7077").setAppName("mnist")
sc = SparkContext(conf=conf)
t = time.time()
runner = MirroredStrategyRunner(num_slots=n, local_mode=False, use_gpu=False, use_custom_strategy=False)
runner.run(train)
print("Distributed Time: ", time.time() - t, "seconds")
Я проверил в spark WebUI, что все три рабочих узла заняты, поэтому я не понимаю, почему распределенный способ занимает больше времени. Если бы кто-нибудь мог помочь мне найти решение, я был бы очень благодарен. Большое спасибо.