#apache-spark #pyspark #rdd #johnsnowlabs-spark-nlp
Вопрос:
У меня есть RDD следующей структуры:
my_rdd = [Row(text='Hello World. This is bad.'), Row(text='This is good.'), ...]
Я могу выполнять параллельную обработку с помощью функций python:
rdd2=my_rdd.map(lambda f: f.text.split())
for x in rdd2.collect():
print(x)
и это дает мне ожидаемый результат.
Однако, когда я пытаюсь использовать прерыватель предложений spark-NLP или анализатор настроений, я получаю ошибку: Ошибка выбора: Не удалось сериализовать объект: Ошибка типа: не удается рассолить _thread.Объекты RLock
в этой строке: для x в rdd2.collect():
Вот код:
documenter = DocumentAssembler()
.setInputCol("text")
.setOutputCol("document")
sentencerDL = SentenceDetectorDLModel
.pretrained("sentence_detector_dl", "en")
.setInputCols(["document"])
.setOutputCol("sentences")
sd_pipeline = PipelineModel(stages=[documenter, sentencerDL])
sd_model = LightPipeline(sd_pipeline)
pipeline = PretrainedPipeline('analyze_sentiment', 'en')
Если я попытаюсь:
rdd2=my_rdd.map(lambda f: pipeline.annotate(f.text))
или
rdd2=my_rdd.map(lambda f: sd_model.fullAnnotate(f.text)[0]["sentences"].split()[0])
Возникает ошибка. Когда я запускаю их без «сопоставления», они функционируют так, как ожидалось.
Кто-нибудь знает, как параллельно выполнять прерыватель предложений spark-NLP или анализатор настроений? Что я делаю неправильно?
Спасибо всем!
Ответ №1:
когда вы применяете конвейер Spark-ML к фрейму данных, в котором данные распределены по разным разделам, по умолчанию вы получаете параллельное выполнение. То же самое относится и к трубопроводу spark-NLP(который также является трубопроводом Spark-ML). Так что вы можете сделать,
конвейер.преобразование(фрейм данных)
И создайте «фрейм данных» таким образом, чтобы данные распределялись по разным узлам. Хороший учебник находится здесь,
https://sparkbyexamples.com/pyspark/pyspark-create-dataframe-from-list/
Также для отображения содержимого фрейма данных после преобразования с помощью Spark-NLP вы можете использовать функции в разделе sparknlp.функции, например map_annotations_col, которые позволят вам отобразить содержимое определенного столбца в фрейме данных, содержащем аннотации Spark-NLP. Кстати, это,
rdd2=my_rdd.карта(лямбда f: конвейер.аннотировать(f.текст))
это то, чего вам не следует делать, вы получаете это исключение, потому что Spark пытается сериализовать весь ваш конвейер и отправить его узлам кластера. Это не так, как должно работать, вы передаете данные в конвейер и позволяете конвейеру выбирать, что распространять в кластере.
Комментарии:
1. Большое тебе спасибо, Альберто!