Настраиваемый алгоритм агрегирования для градиентных обновлений в tensorflow federated

#tensorflow #tensorflow-federated

#tensorflow #tensorflow-федеративный

Вопрос:

Я пытался реализовать эту статью . По сути, я хочу просуммировать потери каждого клиента и сравнить их с предыдущей эпохой. Затем для каждого составляющего слоя модели сравните расхождение KL между весами серверной и клиентской модели, чтобы получить обновления параметров для конкретного уровня, а затем выполнить softmax и решить, требуется ли адаптивное обновление или обычный подход FedAvg.

Алгоритм выглядит следующим образом — FedMed

Я попытался использовать приведенный здесь код для создания пользовательского объединенного процесса avg. Я получил базовое понимание того, что есть некоторые tf.вычисления и некоторые tff.вычисления, которые задействованы. Я понимаю, что мне нужно внести изменения в логику оркестровки в функции run_one_round и в основном манипулировать выводами клиента для выполнения адаптивного усреднения вместо ванильного федеративного усреднения. Функция client_update tf.computation в основном возвращает все значения, которые мне нужны, т.е. weights_delta (может использоваться для весов модели на основе клиента), model_output(который можно использовать для расчета потерь).

Но я не уверен, где именно я должен внести изменения.

 @tff.federated_computation(federated_server_state_type,
                             federated_dataset_type)
  def run_one_round(server_state, federated_dataset):

    server_message = tff.federated_map(server_message_fn, server_state)
    server_message_at_client = tff.federated_broadcast(server_message)
  
  client_outputs = tff.federated_map(
        client_update_fn, (federated_dataset, server_message_at_client))

    weight_denom = client_outputs.client_weight

# todo
# instead of using tff.federated_mean I wish to do a adaptive aggregation based on the client_outputs.weights_delta and server_state model
    round_model_delta = tff.federated_mean(
        client_outputs.weights_delta, weight=weight_denom)


#client_outputs.weights_delta   has all the client model weights.
#client_outputs.client_weight has the number of examples per client.
#client_outputs.model_output has the output of the model per client example.
 

Я хочу использовать веса модели сервера, используя объект server_state.
Я хочу рассчитать расхождение KL между весами модели сервера и модели каждого клиента для каждого уровня. Затем используйте относительный вес для агрегирования весов клиентов вместо ванильного федеративного усреднения.
Вместо использования tff.federated_mean я хочу использовать другую стратегию, в основном адаптивную, основанную на приведенном выше алгоритме.

Итак, мне понадобились некоторые предложения о том, как это реализовать.
По сути, я хочу сделать следующее:
1) Суммируйте все значения потерь клиентов.
2) Вычислите расхождение KL для каждого уровня базы всех клиентов с сервером, а затем определите, использовать ли адаптивную оптимизацию или FedAvg.

Также есть ли способ манипулировать этим значением как значением python, которое будет полезно для целей отладки (я пытался использовать tf.print, но это тоже не помогло). Спасибо!

Комментарии:

1. Не могли бы вы добавить фрагменты кода к вашему вопросу? Это помогло бы с лучшими ответами, поскольку это уменьшит любую потенциальную путаницу в отношении того, что было опробовано и какова цель.

2. @ZacharyGarrett Я внес изменения и добавил часть кода и добавил комментарии, в которые я хочу внести изменения.

Ответ №1:

Простейший вариант: вычислить веса для среднего значения на клиентах

Если я правильно прочитал приведенный выше алгоритм, нам нужно только вычислить некоторые веса для среднего значения на лету. tff.federated_mean принимает необязательный weight аргумент, размещенный КЛИЕНТАМИ, поэтому, вероятно, самым простым вариантом здесь является вычисление желаемых весов для клиентов и передача их в среднее значение.

Это будет выглядеть примерно так (при условии соответствующих определений переменных, используемых ниже, которые мы прокомментируем):

 
@tff.federated_computation(...)
def round_function(...):
  ...
  # We assume there is a tff.Computation training_fn that performs training,
  # and we're calling it here on the correct arguments
  trained_clients = tff.federated_map(training_fn, clients_placed_arguments)
  # Next we assume there is a variable in-scope server_model,
  # representing the 'current global model'.
  global_model_at_clients = tff.federated_broadcast(server_model)
  # Here we assume a function compute_kl_divergence, which takes
  # two structures of tensors and computes the KL divergence
  # (as a scalar) between them. The two arguments here are clients-placed,
  # so the result will be as well.
  kl_div_at_clients = tff.federated_map(compute_kl_divergence,
      (global_model_at_clients, trained_clients))
  # Perhaps we wish to not use raw KL divergence as the weight, but rather
  # some function thereof; if so, we map a postprocessing function to
  # the computed divergences. The result will still be clients-placed.
  mean_weight = tff.federated_map(postprocess_divergence, kl_div_at_clients)
  # Now we simply use the computed weights in the mean.
  return tff.federated_mean(trained_clients, weight=mean_weight)
 

Более гибкий инструмент: tff.federated_reduce

TFF обычно поощряет разработчиков алгоритмов реализовывать все, что они могут «в агрегации», и, таким образом, предоставляет некоторые настраиваемые примитивы, такие как tff.federated_reduce , которые позволяют запускать произвольный тензорный поток «в потоке» между клиентами и сервером. Если приведенное выше прочтение желаемого алгоритма неверно и требуется что-то более сложное, или вы хотите гибко экспериментировать с совершенно другими понятиями агрегации (что-то, что TFF поощряет и предназначено для поддержки), это может быть вариантом для вас.

На языке эвристической типизации TFF tff.federated_reduce имеет подпись:

 <{T}@CLIENTS, U, (<U, T> -> U)> -> U@SERVER
 

Значение, federated_reduce принимающее значение типа T , размещенное на клиентах, «ноль» в редукционной алгебре типа U , и функция, принимающая a U и a T и производящая a U , и применяет эту функцию «в потоке» на пути между клиентами и сервером, создавая U размещенное на сервере. Функция (<U, T> -> U) будет применена к частично накопленному значению U и «следующему» элементу в потоке T (обратите внимание, однако, что TFF не гарантирует упорядочение этих значений), возвращая другое частично накопленное значение U . «Ноль» должен представлять все «частично накопленные» средства над пустым набором в вашем приложении; это будет отправной точкой сокращения.

Применение к этой проблеме

Компоненты

Вашей функции сокращения необходим доступ к двум частям данных: состоянию глобальной модели и результату обучения на данном клиенте. Это довольно хорошо соответствует типу T . В этом приложении у нас будет что-то вроде:

 T = <server_model=server_model_type, trained_model=trained_model_type>
 

Эти два типа, вероятно, будут одинаковыми, но могут и не обязательно быть таковыми.

Ваша функция сокращения примет частичный агрегат, вашу модель сервера и модель, обученную вашим клиентом, возвращая новый частичный агрегат. Здесь мы начнем предполагать то же чтение алгоритма, что и выше, а именно средневзвешенное значение с определенными весами. Как правило, самый простой способ вычисления среднего значения — сохранить два аккумулятора, один для числителя и один для знаменателя. Это повлияет на выбор zero и приведенную ниже функцию уменьшения.

Ваш zero должен содержать структуру тензоров со значением 0, соответствующим весам вашей модели — это будет числитель. Это было бы сгенерировано для вас, если бы у вас была агрегация, подобная tff.federated_sum (поскольку TFF знает, каким должен быть ноль), но в этом случае вам придется самостоятельно получить такой тензор. Это не должно быть слишком сложно с tf.nest.map_structure помощью and tf.zeros_like .

Для знаменателя мы предположим, что нам просто нужен скаляр. TFF и TF гораздо более гибкие, чем это — при желании вы могли бы сохранить знаменатель для каждого слоя или для каждого параметра, но для простоты мы предположим, что в конце мы просто хотим разделить на одно число с плавающей запятой.

Поэтому наш тип U будет примерно таким:

 U = <numerator=server_model_type, denominator=tf.float32>
 

Наконец, мы подходим к нашей функции сокращения. Это будет более или менее другая композиция из тех же частей, что и выше; здесь мы сделаем несколько более жесткие предположения о них (в частности, что все локальные функции tff.tf_computations — это техническое предположение, возможно, ошибка в TFF). Наша функция сокращения будет выглядеть следующим образом (при условии наличия соответствующих псевдонимов типов)::

 @tff.tf_computation(U, T)
def reduction(partial_accumulate, next_element):
  kl_div = compute_kl_divergence(
      next_element.server_model, next_element.trained_model)
  weight = postprocess_divergence(kl_div)
  new_numerator = partial_accumulate.numerator   weight * next_element.trained_model
  new_denominator = partial_accumulate.denominator   weight
  return collections.OrderedDict(
      numerator=new_numerator, denominator=new_denominator)
 

Объединение их

Основная схема раунда будет аналогична приведенной выше; но мы поместили больше вычислений «в поток», и, следовательно, на клиентах будет меньше. Мы предполагаем, что здесь те же определения переменных.

 
@tff.federated_computation(...)
def round_function(...):
  ...
  trained_clients = tff.federated_map(training_fn, clients_placed_arguments)
  global_model_at_clients = tff.federated_broadcast(server_model)
  # This zip I believe is not necessary, but it helps my mental model.
  reduction_arg = tff.federated_zip(
      collections.OrderedDict(server_model=global_model_at_clients,
                              trained_model=trained_clients))
  # We assume a zero as specified above
  return tff.federated_reduce(reduction_arg,
                              zero,
                              reduction)