Оптимизация и регрессия гребня в PySpark

#python #apache-spark #machine-learning #pyspark #data-science

#python #apache-spark #машинное обучение #pyspark #наука о данных

Вопрос:

Я реализовал регрессию гребня, используя PySpark. Я использую его для набора данных, размер которого составляет примерно 1,7 миллиона строк и 231 столбец. Он занимает более или менее 1 ГБ. Моя реализация работает очень медленно в зависимости от того, на какой машине я ее выполняю: на Google colab требуется 8 минут для выполнения 2000 итераций градиентного спуска для подмножества набора данных, на моем ноутбуке (intel i7 8-го поколения U и 8 ГБ оперативной памяти, когда colab имеет всего 2 ядра) требуется 4 минуты, чтобы выполнить 2000 итераций градиентного спуска для подмножества набора данных.выполните 10 итераций для одного и того же подмножества. Ниже показаны мои реализации, есть ли способ ускорить процесс?

 class SparkRidgeRegression(object):
""" Base regression model. Models the relationship between a scalar dependent variable y and the independent 
    variables X. 
    Parameters:
    -----------
    n_iterations: float
        The number of training iterations the algorithm will tune the weights for.
    learning_rate: float
        The step length that will be used when updating the weights."""
def __init__(self, n_iterations, learning_rate, reg_factor):
    self.n_iterations = n_iterations
    self.learning_rate = learning_rate
    self.reg_factor = reg_factor

def get_training_errors(self):
    return self.training_errors

def set_training_errors(self, error):
    self.training_errors = error

def squared_error(self, target, prediction):
    return (target - prediction) ** 2

def root_mean_squared_error(self, predictions):
    return np.sqrt(predictions.map(lambda p: self.squared_error(*p)).mean())

def mean_squared_error(self, predictions):
    return predictions.map(lambda p: self.squared_error(p[0], p[1])).mean()

def mean_absolute_error(self, predictions):
    return np.abs(predictions.map(lambda prediction: prediction[1] - prediction[0]).reduce(lambda a, b: a   b))/predictions.count()

def r2(self, predictions):
    mean_ = predictions.rdd.map(lambda t: t[0]).mean()
    sum_squares = predictions.rdd.map(lambda t: (t[0] - mean_)**2).sum()
    residual_sum_squares = predictions.rdd.map(lambda t: self.squared_error(*t)).sum()
    return 1 - (residual_sum_squares / sum_squares)


def get_grad_sum(self, example):
    return (self.weights.dot(DenseVector(example.features)) - example.label) * example.features


def fit(self, observations):
    progressbar = ProgressBar()
    features_number = len(observations.take(1)[0].features)
    self.training_errors = []
    self.weights = np.zeros(features_number)        
    start = datetime.now()
    # Do gradient descent for n_iterations
    for i in progressbar(range(self.n_iterations)):
      # Get the prediction given an example and the current weights
      predictions = observations.map(lambda example: self.predict(example)) # Result [label, prediction]
      # Calculate l2 loss
      regularization = self.reg_factor * self.weights
      self.training_errors.append(self.root_mean_squared_error(predictions))
      # Gradient of l2 loss w.r.t w
      grad_w = observations.map(lambda example: DenseVector(self.get_grad_sum(example))).reduce(lambda x, y: x   y)   regularization
      # Update the weights
      self.weights -= self.learning_rate * grad_w

def predict(self, example):
    return (example.label, self.weights.dot(DenseVector(example.features)))'''
  

Моя установленная версия spark — 3.0 с hadoop 2.7
Моя конфигурация pyspark, полученная из кода:

 [('spark.rdd.compress', 'True'),
 ('spark.app.id', 'local-1602447405987'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.host', 'DESKTOP-0CA7QUP'),
 ('spark.driver.port', '50297'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'pyspark-shell')]
  

Комментарии:

1. Я думаю, что для устранения этой проблемы потребуется дополнительная информация о вашей локальной конфигурации Spark. Возможно, настройки памяти Spark executor слишком низкие или ряд других вещей.

2. Моя установленная версия spark 3.0 с hadoop 2.7 в редактируемой конфигурации spark