Как написать UDFS в pyspark для выполнения вызовов API с ограничениями квоты?

#pandas #pyspark

#pandas #pyspark

Вопрос:

Ниже приведен код, который написан нераспределенным способом с использованием традиционного цикла pandas. Я уверен, что for цикл — плохая практика для перебора фреймов данных pandas. По сути, я перебираю text записи одну за другой в наборе данных IMDB reviews и пытаюсь получить оценку настроений и величину для каждого текста в текущей строке.

Вот код

 from google.cloud import language
from google.cloud.language import enums
from google.cloud.language import types
import pandas as pd
import datetime

def compute_sentiment_score(text,client):
    document = types.Document(content=text,type=enums.Document.Type.PLAIN_TEXT, language='en')
    sentiment = client.analyze_sentiment(document=document).document_sentiment
    return str(sentiment.score)

def compute_sentiment_magnitude(text,client):
    document = types.Document(content=text,type=enums.Document.Type.PLAIN_TEXT, language='en')
    sentiment = client.analyze_sentiment(document=document).document_sentiment
    return str(sentiment.magnitude)

import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="/path-to-file.json"

imdb_reviews = pd.read_csv('imdb_reviews.csv', header=None, names=['input1', 'input2'], encoding= "ISO-8859-1")

imdb_reviews.head()

    input1                                         input2
0   first think another Disney movie, might good, ...   1
1   Put aside Dr. House repeat missed, Desperate H...   0
2   big fan Stephen King's work, film made even gr...   1
3   watched horrid thing TV. Needless say one movi...   0
4   truly enjoyed film. acting terrific plot. Jeff...   1
  

Вот этот уродливый цикл — прямо сейчас я запускаю это всего для 100 итераций, но фактические записи составляют 25 тыс.

 client = language.LanguageServiceClient() # establish client connection

print(datetime.datetime.now())
for i in range(100):
    imdb_reviews.loc[i, 'score'] = compute_sentiment_score(str(imdb_reviews.loc[i,'input1']),client)
    imdb_reviews.loc[i,'magnitude'] = compute_sentiment_magnitude(str(imdb_reviews.loc[i,'input1']),client)
print(datetime.datetime.now())

2020-09-09 00:39:11.312789 # start time of loop
2020-09-09 00:40:17.167283 # end time of loop
  

Итак, чтобы вычислить это для 100 записей, требуется около 60 секунд. Если я использую тот же масштаб, очевидно, что для обработки этого потребуется много времени.

Есть ли способ сделать это в pyspark распределенным способом? Я пробовал то же самое с pypsark ранее, используя UDF но в итоге я создавал 1000 объектов подключения менее чем за минуту, а у Google Cloud есть ограничение на квоту запроса. Следовательно, я получал сбой, как показано ниже —

 "grpc_message":"Quota exceeded for quota metric 'Requests' and limit 'Requests per minute per user' of service 'language.googleapis.com' for consumer
  

Ответ №1:

Сервисные квоты всегда будут узким местом, что бы вы ни делали.

При этом вы можете попробовать два подхода:

  1. проверьте, имеет ли используемый вами API асинхронную версию. Это позволит вам быть максимально эффективным при работе над одним процессом: вы можете пакетировать запросы и отправлять их одновременно (из квоты сервиса).
  2. В spark вместо использования udf следует использовать mapPartitions. Это позволит вам создавать соединения для каждого раздела вместо для каждой записи, что значительно ускорит процесс. Вы можете обрабатывать количество параллельных запросов, контролируя количество имеющихся у вас разделов (перераспределение). Я не беспокоюсь о нехватке памяти здесь, поскольку набор данных не очень большой.

Окончательное решение будет сочетать (1) и (2), но не все API поддерживают асинхронные операции.

Комментарии:

1. спасибо за ответ. Можете ли вы скинуть несколько ссылок, которые показывают примеры кода, как использовать mapPartitions ? Это помогло бы мне в качестве отправной точки.