Линейная регрессия с использованием tf.data с API федеративного ядра и данных на клиенте удаленного выполнения

#tensorflow2.0 #tensorflow-datasets #tensorflow-federated

#tensorflow2.0 #tensorflow-наборы данных #tensorflow-федеративный

Вопрос:

Я пытаюсь продемонстрировать федеративное обучение с помощью tff. И я зашел так далеко, но сообщения об ошибках, которые я получаю, слишком запутанны. Важной частью является то, что я хочу продемонстрировать, что данные находятся в удаленном движке, поэтому я использую tf.data.experimental.CsvDataset , и я не смог найти ничего подобного ни в одном учебнике. Мне удалось провести мини-эксперимент, в котором данные считывались на удаленном сайте, но я не могу заставить этот более крупный пример работать.

В настоящее время он жалуется на «p = x * w b», я полагаю, потому что x не является federated_value . Но я перепробовал много вариантов и просто не могу заставить его работать. Файл Salary.csv взят из учебника здесь https://www.kaggle.com/karthickveerakumar/salary-data-simple-linear-regression ?select=Salary_Data.csv

 import pandas as pd
import matplotlib.pyplot as plt

import tensorflow as tf
import tensorflow_federated as tff

import grpc

ip_address = '127.0.0.1'
port = 8000

channels = [grpc.insecure_channel(f'{ip_address}:{port}') for _ in range(10)]

tff.backends.native.set_remote_execution_context(channels, rpc_mode='STREAMING')

@tf.function()
def load_data():
    return tf.data.experimental.CsvDataset('data/Salary.csv', [tf.float64,tf.float64], header=True)


W_TYPE = tff.FederatedType(tf.float64, tff.CLIENTS, all_equal=True)
B_TYPE = tff.FederatedType(tf.float64, tff.CLIENTS, all_equal=True)
@tff.federated_computation(W_TYPE, B_TYPE)
def train(w, b):
    data = load_data()
    loss = tf.Variable(0.0, dtype=tf.float64)
    with tf.GradientTape() as tape:
        for x, y in data:
            p = x * w   b
            loss = loss   tf.square(p - y)

    g_w, g_b = tape.gradient(loss, [w, b])
    w.assign_sub(0.0001 * g_w)
    b.assign_sub(0.0001 * g_b)
    return [w, b]

w = tf.Variable(2.0, dtype=tf.float64)
b = tf.Variable(3.0, dtype=tf.float64)
for _ in range(1000):
    w, b = train(data, tff.federated_broadcast(w), tff.federated_broadcast(b))

  

Ответ №1:

TFF не поддерживает смешивание объединенных вычислений с TensorFlow. Обычная парадигма в TFF выглядит примерно так:

  1. Напишите свои локальные функции в TensorFlow, используя @tff.tf_computation декоратор TFF
  2. Внутри тела a tff.federated_computation вызовите встроенные операторы и операторы связи (как tff.federated_broadcast указано выше), а tff.federated_map ваши вычисления TF для федеративных элементов.

Существует много тонкостей, которые привели к разработке шаблона, подобного приведенному выше, но основной из них является желание, чтобы вся логика была выражена в сериализованном представлении, чтобы ее можно было разделить и отправить на различные устройства в вашей федеративной системе во время реального развертывания. Это немного странно; вы можете думать о декораторах TFF как об определении контекста, в котором будет отслеживаться код, который вы пишете, чтобы он мог выполняться полностью независимым от платформы способом.

Используя этот шаблон, ваши вычисления выше будут выглядеть примерно так:

 
# These lines set up the TFF execution environment, and importantly are not used 
# during the function definitions below--only upon *invocation* of your function
# in the outer Python context will the execution environment be used.
channels = [grpc.insecure_channel(f'{ip_address}:{port}') for _ in range(10)]
tff.backends.native.set_remote_execution_context(channels, rpc_mode='STREAMING')


@tf.function()
def load_data():
    # From TFF's perspective, the code written here will be represented as a
    # "blob of Tensorflow" that can be shipped out anywhere. We don't actually
    # *need* a tf_computation decorator here, since this will be invoked in the
    # body of another tf_computation and the logic will get embedded there, but 
    # we could put one here if we wanted.
    return tf.data.experimental.CsvDataset(
        'data/Salary.csv', [tf.float64,tf.float64], header=True)

@tff.tf_computation(tf.float64, tf.float64)
@tf.function
def local_train(w, b):
    data = load_data()
    # We don't need a variable for the loss here, and though TFF will allow you
    # to construct a variable to use as a temporary in this fashion, tf.function
    # won't. We're pushing the TF team on that one ;).
    loss = tf.constant(0.0, dtype=tf.float64)
    with tf.GradientTape() as tape:
        # We must be inside a tf.function decorator, or the eager Python runtime,
        # to iterate over a tf.data.Dataset in this way.
        for x, y in data:
            p = x * w   b
            loss = loss   tf.square(p - y)

    g_w, g_b = tape.gradient(loss, [w, b])
    w = w- 0.0001 * g_w
    b = b- 0.0001 * g_b
    return w, b

# Making a symbol to represent these types is always a good idea
W_TYPE = tff.FederatedType(tf.float64, tff.CLIENTS, all_equal=True)
B_TYPE = tff.FederatedType(tf.float64, tff.CLIENTS, all_equal=True)

@tff.federated_computation(W_TYPE, B_TYPE)
def train(w, b):
    # Here w and b are elements of federated type, placed at clients.
    # We map the training function over these values.
    # If they were SERVER-placed instead, we would federated_broadcast them
    # out to the clients.
    updated_w, updated_b = tff.federated_map(local_train, (w, b))

# TFF's Python runtime represents federated values placed at clients as a list of
# values, with as many elements as there are clients. This number will be inferred 
# by the runtime and used to distribute the work. We also technically don't need
# variables here, but I believe they won't cause any problem.
clients_placed_w = [tf.Variable(2.0, dtype=tf.float64)]
clients_placed_b = [tf.Variable(3.0, dtype=tf.float64)]

# We could move the execution environment setup lines all the way down here,
# no problem.
for _ in range(1000):
    clients_placed_w, clients_placed_b = train(clients_placed_w, clients_placed_b)