#python #apache-spark #machine-learning #pyspark #data-science
#python #apache-spark #машинное обучение #pyspark #наука о данных
Вопрос:
Я реализовал регрессию гребня, используя PySpark. Я использую его для набора данных, размер которого составляет примерно 1,7 миллиона строк и 231 столбец. Он занимает более или менее 1 ГБ. Моя реализация работает очень медленно в зависимости от того, на какой машине я ее выполняю: на Google colab требуется 8 минут для выполнения 2000 итераций градиентного спуска для подмножества набора данных, на моем ноутбуке (intel i7 8-го поколения U и 8 ГБ оперативной памяти, когда colab имеет всего 2 ядра) требуется 4 минуты, чтобы выполнить 2000 итераций градиентного спуска для подмножества набора данных.выполните 10 итераций для одного и того же подмножества. Ниже показаны мои реализации, есть ли способ ускорить процесс?
class SparkRidgeRegression(object):
""" Base regression model. Models the relationship between a scalar dependent variable y and the independent
variables X.
Parameters:
-----------
n_iterations: float
The number of training iterations the algorithm will tune the weights for.
learning_rate: float
The step length that will be used when updating the weights."""
def __init__(self, n_iterations, learning_rate, reg_factor):
self.n_iterations = n_iterations
self.learning_rate = learning_rate
self.reg_factor = reg_factor
def get_training_errors(self):
return self.training_errors
def set_training_errors(self, error):
self.training_errors = error
def squared_error(self, target, prediction):
return (target - prediction) ** 2
def root_mean_squared_error(self, predictions):
return np.sqrt(predictions.map(lambda p: self.squared_error(*p)).mean())
def mean_squared_error(self, predictions):
return predictions.map(lambda p: self.squared_error(p[0], p[1])).mean()
def mean_absolute_error(self, predictions):
return np.abs(predictions.map(lambda prediction: prediction[1] - prediction[0]).reduce(lambda a, b: a b))/predictions.count()
def r2(self, predictions):
mean_ = predictions.rdd.map(lambda t: t[0]).mean()
sum_squares = predictions.rdd.map(lambda t: (t[0] - mean_)**2).sum()
residual_sum_squares = predictions.rdd.map(lambda t: self.squared_error(*t)).sum()
return 1 - (residual_sum_squares / sum_squares)
def get_grad_sum(self, example):
return (self.weights.dot(DenseVector(example.features)) - example.label) * example.features
def fit(self, observations):
progressbar = ProgressBar()
features_number = len(observations.take(1)[0].features)
self.training_errors = []
self.weights = np.zeros(features_number)
start = datetime.now()
# Do gradient descent for n_iterations
for i in progressbar(range(self.n_iterations)):
# Get the prediction given an example and the current weights
predictions = observations.map(lambda example: self.predict(example)) # Result [label, prediction]
# Calculate l2 loss
regularization = self.reg_factor * self.weights
self.training_errors.append(self.root_mean_squared_error(predictions))
# Gradient of l2 loss w.r.t w
grad_w = observations.map(lambda example: DenseVector(self.get_grad_sum(example))).reduce(lambda x, y: x y) regularization
# Update the weights
self.weights -= self.learning_rate * grad_w
def predict(self, example):
return (example.label, self.weights.dot(DenseVector(example.features)))'''
Моя установленная версия spark — 3.0 с hadoop 2.7
Моя конфигурация pyspark, полученная из кода:
[('spark.rdd.compress', 'True'),
('spark.app.id', 'local-1602447405987'),
('spark.serializer.objectStreamReset', '100'),
('spark.master', 'local[*]'),
('spark.submit.pyFiles', ''),
('spark.executor.id', 'driver'),
('spark.submit.deployMode', 'client'),
('spark.driver.host', 'DESKTOP-0CA7QUP'),
('spark.driver.port', '50297'),
('spark.ui.showConsoleProgress', 'true'),
('spark.app.name', 'pyspark-shell')]
Комментарии:
1. Я думаю, что для устранения этой проблемы потребуется дополнительная информация о вашей локальной конфигурации Spark. Возможно, настройки памяти Spark executor слишком низкие или ряд других вещей.
2. Моя установленная версия spark 3.0 с hadoop 2.7 в редактируемой конфигурации spark