Функции Spark-NLP выдают ошибку травления при использовании карты

#apache-spark #pyspark #rdd #johnsnowlabs-spark-nlp

Вопрос:

У меня есть RDD следующей структуры:

 my_rdd = [Row(text='Hello World. This is bad.'), Row(text='This is good.'), ...]
 

Я могу выполнять параллельную обработку с помощью функций python:

 rdd2=my_rdd.map(lambda f: f.text.split()) 
for x in rdd2.collect():
  print(x)
 

и это дает мне ожидаемый результат.

Однако, когда я пытаюсь использовать прерыватель предложений spark-NLP или анализатор настроений, я получаю ошибку: Ошибка выбора: Не удалось сериализовать объект: Ошибка типа: не удается рассолить _thread.Объекты RLock

в этой строке: для x в rdd2.collect():

Вот код:

 documenter = DocumentAssembler()
    .setInputCol("text")
    .setOutputCol("document")
    
sentencerDL = SentenceDetectorDLModel
  .pretrained("sentence_detector_dl", "en") 
  .setInputCols(["document"]) 
  .setOutputCol("sentences")

sd_pipeline = PipelineModel(stages=[documenter, sentencerDL]) 
sd_model = LightPipeline(sd_pipeline)
pipeline = PretrainedPipeline('analyze_sentiment', 'en')
 

Если я попытаюсь:

 rdd2=my_rdd.map(lambda f: pipeline.annotate(f.text))                    
 

или

 rdd2=my_rdd.map(lambda f: sd_model.fullAnnotate(f.text)[0]["sentences"].split()[0])
 

Возникает ошибка. Когда я запускаю их без «сопоставления», они функционируют так, как ожидалось.

Кто-нибудь знает, как параллельно выполнять прерыватель предложений spark-NLP или анализатор настроений? Что я делаю неправильно?

Спасибо всем!

Ответ №1:

когда вы применяете конвейер Spark-ML к фрейму данных, в котором данные распределены по разным разделам, по умолчанию вы получаете параллельное выполнение. То же самое относится и к трубопроводу spark-NLP(который также является трубопроводом Spark-ML). Так что вы можете сделать,

конвейер.преобразование(фрейм данных)

И создайте «фрейм данных» таким образом, чтобы данные распределялись по разным узлам. Хороший учебник находится здесь,

https://sparkbyexamples.com/pyspark/pyspark-create-dataframe-from-list/

Также для отображения содержимого фрейма данных после преобразования с помощью Spark-NLP вы можете использовать функции в разделе sparknlp.функции, например map_annotations_col, которые позволят вам отобразить содержимое определенного столбца в фрейме данных, содержащем аннотации Spark-NLP. Кстати, это,

rdd2=my_rdd.карта(лямбда f: конвейер.аннотировать(f.текст))

это то, чего вам не следует делать, вы получаете это исключение, потому что Spark пытается сериализовать весь ваш конвейер и отправить его узлам кластера. Это не так, как должно работать, вы передаете данные в конвейер и позволяете конвейеру выбирать, что распространять в кластере.

Комментарии:

1. Большое тебе спасибо, Альберто!