Pyspark с преобразованием типов sklearn и dataframe

#python #pyspark #scikit-learn #numpy-ndarray

#python #pyspark #scikit-learn #numpy-ndarray

Вопрос:

Я пытаюсь использовать sklearn с pyspark, но у меня возникли некоторые проблемы с производительностью. Допустим, у меня есть набор данных, который уже прошел через конвейер, в котором объекты были векторизованы и нормализованы. Чтобы использовать sklearn, мне нужно либо предоставить алгоритму массив, либо фрейм данных pandas. Учитывая размер моего набора данных (2,8 млн и должен скоро увеличиться), преобразование набора данных в pandas происходит очень медленно, поэтому я использую простой подход:

 train =  np.array(train.select('features').collect()).squeeze()
 

Это происходит относительно медленно, так как мне нужно использовать сбор, чтобы отправить данные обратно в драйвер. Есть ли какой-либо другой подход, который был бы быстрее и лучше? Кроме того, из-за характера проблемы я в настоящее время обрабатываю свою функцию подсчета очков нестандартным способом:

 def score (fitmodel, test):
   predY = fitmodel.score_samples(test)
   return np.full((1, len(predY)), np.mean(predY)).transpose()
 

Идея состоит в том, чтобы вычислить среднее значение прогнозов, а затем вернуть массив, в котором этот результат реплицируется столько раз, сколько проверено записей. Например, если в моем тестовом наборе 450 записей, я верну массив в форме (450,1), где все 450 записей имеют одинаковое значение (среднее значениепредсказания). Хотя и медленно, пока все хорошо, и все работает так, как задумано. Моя проблема в том, что мне нужно продолжить тестирование, выполнив это несколько раз (изменив набор тестов) и добавив результаты в один массив, чтобы позже оценить производительность модели. Мой код:

 for _ in tdqm(range(450, 800)):
    #Get group #X
    _test = df.where(col('index') == _) #Get a different "chunk" of the dataset each iteration
    _test.coalesce(2) 
    #Apply pipeline with transforms
    test = pipelineModel.transform(_test)
    y_test = np.array(test.select('label').collect())
    x_test = np.array(test.select('features').collect()).squeeze()
    pred = newScore(x_test, model)
    
    if(_ == (450)): #First run
        trueY = y_test
        predY = pred
    else:
        trueY = np.append(trueY, y_test)
        predY = np.append(predY, pred)
 

Вкратце, я беру определенную часть набора данных, тестирую ее, а затем хочу добавить как предсказания, так и метку «true» для последующей оценки. Моя главная проблема здесь в том, что выполнение 2-х сборок и np.append занимает много времени, и мне нужно найти альтернативу. Тестирование всего набора тестов (около 400 тыс. записей) занимает у меня ~ 1 минуту, но при наличии всего этого время увеличивается до 2 часов 20 минут.
Кроме того, я должен преобразовать массив обратно в pyspark dataframe, чтобы использовать функции вычисления mllib, что добавляет немного больше времени к процессу.

Учитывая все сказанное, может ли кто-нибудь указать мне направление, в котором я мог бы сделать это более эффективным способом? Может быть, есть другой способ использовать spark и sklearn?

Комментарии:

1. «Эффективный» подход заключается в том, чтобы избегать sklearn и использовать то, что предлагает pyspark ml.

2. Это не тот ответ, который я ожидал. Очевидно, что я использую pyspark ml, когда это возможно, но, как вы знаете, некоторые модели пока недоступны.