производительность распределенного тензорного потока alexnet

#tensorflow

#тензорный поток

Вопрос:

Запуск Alexnet с использованием distributed tensorflow не масштабируется по количеству изображений в секунду. Здесь я использую модель alexnet alexnet_benchmark.py с несколькими модификациями для распределенного обучения на экземпляре EC2 G2 (NVIDIA GRID K520), и я вижу, что он может обрабатывать 5-6 изображений в секунду на одном графическом процессоре, на одном хосте, однако его запуск без распределенного кода может обрабатывать 112 изображений в секунду на одномГрафический процессор. Это кажется очень странным, не могли бы вы проверить, что может быть не так в этом коде для его распространения? Сервер параметров не запускается на графическом процессоре, но рабочие выполняются с использованием префикса CUDA_VISIBLE_DEVICES

 ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")

# Create a cluster from the parameter server and worker hosts.
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

# Create and start a server for the local task.
server = tf.train.Server(cluster,
                   job_name=FLAGS.job_name,
                   task_index=FLAGS.task_index)

if FLAGS.job_name == "ps":
    server.join()
elif FLAGS.job_name == "worker":

    gpu = FLAGS.task_index % 4

    # Assigns ops to the local worker by default.
    with tf.device(tf.train.replica_device_setter(
        #'/gpu:%d' % i
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        #worker_device='/gpu:%d' % gpu,
        cluster=cluster)):

        summary_op = tf.merge_all_summaries()

        y, x = get_graph()

        y_ = tf.placeholder(tf.float32, [None, NUM_LABELS])

        cross_entropy = tf.reduce_mean( -tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]) )

        global_step = tf.Variable(0)

        gradient_descent_opt = tf.train.GradientDescentOptimizer(LEARNING_RATE)

        num_workers = len(worker_hosts)
        sync_rep_opt = tf.train.SyncReplicasOptimizer(gradient_descent_opt, replicas_to_aggregate=num_workers,
                replica_id=FLAGS.task_index, total_num_replicas=num_workers)

        train_op = sync_rep_opt.minimize(cross_entropy, global_step=global_step)

        init_token_op = sync_rep_opt.get_init_tokens_op()
        chief_queue_runner = sync_rep_opt.get_chief_queue_runner()

        #saver = tf.train.Saver()
        summary_op = tf.merge_all_summaries()

        init_op = tf.initialize_all_variables()
        saver = tf.train.Saver()

    is_chief=(FLAGS.task_index == 0)

    # Create a "supervisor", which oversees the training process.
    sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
                             #logdir="/tmp/train_logs",
                             init_op=init_op,
                             summary_op=summary_op,
                             saver=saver,
                             global_step=global_step)
                             #save_model_secs=600)

    # The supervisor takes care of session initialization, restoring from
    # a checkpoint, and closing when done or an error occurs.
    with sv.managed_session(server.target) as sess:

        if is_chief:
            sv.start_queue_runners(sess, [chief_queue_runner])
            sess.run(init_token_op)

        num_steps_burn_in = 1000
        total_duration = 0
        total_duration_squared = 0
        step = 0

        while step <= 40000:

            print('Iteration %d' % step)
            sys.stdout.flush()
            batch_xs, batch_ys = get_data(BATCH_SIZE)
            train_feed = {x: batch_xs, y_: batch_ys}

            start_time = time.time()

            _, step = sess.run([train_op, global_step], feed_dict=train_feed)

            duration = time.time() - start_time
            if step > num_steps_burn_in:
                total_duration  = duration
                total_duration_squared  = duration * duration

                if not step % 1000:
                    iterations = step - num_steps_burn_in
                    images_processed = BATCH_SIZE * iterations
                    print('%s: step %d, images processed: %d, images per second: %.3f, time taken: %.2f' %
                            (datetime.now(), iterations, images_processed, images_processed/total_duration, total_duration))
                    sys.stdout.flush()
    sv.stop()
  

Комментарии:

1. Можете ли вы собрать временную шкалу (в соответствии с github.com/tensorflow/tensorflow/issues /… ), и посмотрите, каковы узкие места?

2. Кстати, вот график для одной машины. Это показывает, что AlexNet имеет низкое соотношение вычислений и операций ввода-вывода, что затрудняет эффективное использование параллельных ресурсов

Ответ №1:

Ваш код выглядит хорошо — вот несколько моментов, которые следует иметь в виду:

  • График, созданный между одним узлом и несколькими узлами, отличается, сравнение может иметь некоторые связанные с ними вариации. Добавлены очереди и синхронизация, которые добавляются для передачи информации о градиенте на сервер и рабочий сервер и обратно.
  • Поскольку Alexnet имеет относительно быструю передачу вперед и назад, это увеличит накладные расходы на передачу ввода-вывода на сервер и с сервера. Это может отображаться или не отображаться в версии 3 для начала (склоняясь к, возможно, нет).
  • В вашем сообщении упоминалось, что вы использовали отдельный экземпляр EC2 для сервера параметров и рабочего; это лучшая конфигурация. Запуск рабочих и серверов на одном узле, безусловно, сильно повлияет на производительность.
  • Для увеличения рабочих вам, несомненно, придется увеличить количество серверов, обслуживающих рабочих. В начале это начинает происходить после 32 независимых рабочих.
  • имейте в виду, что примерно после 16 рабочих есть свидетельства того, что конвергенция может быть нарушена.

Я предлагаю попробовать distributed inception V3. Эта топология должна демонстрировать почти идеальную масштабируемость по сравнению с ее частью с одним счетчиком узлов. Если это так, ваша аппаратная настройка хорошая; если это не так, дважды проверьте конфигурацию HW.

Если вы проводите исследование масштабируемости, я рекомендую начать сбор относительной производительности с одного сервера параметров и одного рабочего на независимом экземпляре, сравнение с запуском одного узла будет иметь свои различия.