Использование dask с сервером Stanford CoreNLP в Stanza

#python #python-3.x #pandas #dask #stanford-nlp

#python #python-3.x #панды #dask #stanford-nlp

Вопрос:

Я пытаюсь оптимизировать Stanza's CoreNLPClient dask , пытаясь запустить его параллельно.

Например: рассмотрим следующий пример DF

 import pandas as pd
df = pd.DataFrame(["Eating an apple is good for your health"]*100, columns = ["Sample"])
 

Распараллеливание кода:

 from dask import dataframe as dask_dd
from dask.multiprocessing import get
import stanza
from stanza.server import CoreNLPClient, CorefChain

def parallel_func(text):
    
    with CoreNLPClient(
        annotators=['tokenize','ssplit','pos','lemma','ner', 'parse', 'depparse','coref'],
        timeout=30000,endpoint="http://localhost:9000",
        memory='16G') as client:
        
        ann = client.annotate(text)
    return ann.corefChain

def main_func(data):
    
    data["chain"] = dask_dd.from_pandas(data, npartitions=4).map_partitions(
    lambda t: t.apply(lambda m: parallel_func(m["Sample"]), axis = 1)).compute(scheduler="threads")
 

Ошибка:

Unable to start the CoreNLP server on port 9000 (possible something is already running there)

Попытка 1 исправить эту ошибку:

Поскольку порт занят, я попытался different_partitions запустить different_port его, вручную создав index для DF и randomly select a number , который будет передан в качестве порта, предполагая each_sub_partition , что созданный будет иметь different_slice DF so, выбранный случайным index образом, будет отличаться

Код:

 def parallel_func(text, port):
    
    with CoreNLPClient(
        annotators=['tokenize','ssplit','pos','lemma','ner', 'parse', 'depparse','coref'],
        timeout=30000,endpoint="http://localhost:900" str(port),
        memory='16G') as client:
        
        ann = client.annotate(text)
    return ann.corefChain

def main_func(data):

   data=data.reset_index(inplace=False)
   data["chain"] = dask_dd.from_pandas(data, npartitions=4).map_partitions(
   lambda t: t.apply(lambda m: parallel_func(m["Sample"], m["index]), axis = 
  1)).compute(scheduler="threads")
 

Но то же index ===>1 самое передается port параметру, и я получаю ту же ошибку, что и упомянутая выше. Любые предложения о том, как решить эту проблему, были бы полезны