Dask не запускает рабочие программы

#dask #worker

#dask #рабочий

Вопрос:

Я пытаюсь использовать Dask для выполнения операции groupby над фреймом данных. Приведенный ниже код не работает, но кажется, что если я инициализирую клиент с другой консоли, код работает, хотя я ничего не вижу на панели мониторинга ( http://localhost:8787/status ): Я имею в виду, что есть панель мониторинга, но все цифры выглядят пустыми. Я нахожусь на macOS. Код:

 from datetime import datetime
import numpy as np
import os
from dask import dataframe as dd
from dask.distributed import Client
import pandas as pd

client = Client()
# open http://localhost:8787/status

csv_path = 'chicago-complete.monthly.2018-07-01-to-2018-07-31/data.csv'
dir_destination = 'data'


df = dd.read_csv(csv_path,
     dtype = {
        'timestamp': str,
         'node_id': str,
         'subsystem': str,
         'sensor': str,
         'parameter': str, 
         'value_raw': str, 
         'value_hrf': str,
     },
     parse_dates=['timestamp'],
     date_parser=lambda x: pd.datetime.strptime(x, '%Y/%m/%d %H:%M:%S')
)


#%%
if not os.path.exists(dir_destination):
        os.makedirs(dir_destination)

def create_node_csv(df_node):
    # test function
    return len(df_node)

res = df.groupby('node_id').apply(create_node_csv, meta=int)
  

CSV-файл состоит просто из столбцов строки. Моя цель — сгруппировать все строки, содержащие определенное значение в столбце, а затем сохранить их как отдельный файл, используя create_node_csv(df_node) (хотя прямо сейчас это фиктивная функция). Приветствуется любой другой способ сделать это, но я хотел бы понять, что здесь происходит.

Когда я запускаю его, консоль несколько раз выводит следующие ошибки: tornado.application — ОШИБКА — Множественные исключения в обратном выводе списка (последний вызов last):

  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 883, in callback
    result_list.append(f.result())
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/anaconda3/lib/python3.7/site-packages/distributed/deploy/local.py", line 208, in _start_worker
    yield w._start()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 157, in _start
    response = yield self.instantiate()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 226, in instantiate
    self.process.start()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 370, in start
    yield self.process.start()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/anaconda3/lib/python3.7/site-packages/distributed/process.py", line 35, in _call_and_set_future
    res = func(*args, **kwargs)
  File "/anaconda3/lib/python3.7/site-packages/distributed/process.py", line 184, in _start
    process.start()
  File "/anaconda3/lib/python3.7/multiprocessing/process.py", line 112, in start
    self._popen = self._Popen(self)
  File "/anaconda3/lib/python3.7/multiprocessing/context.py", line 291, in _Popen
    return Popen(process_obj)
  File "/anaconda3/lib/python3.7/multiprocessing/popen_forkserver.py", line 35, in __init__
    super().__init__(process_obj)
  File "/anaconda3/lib/python3.7/multiprocessing/popen_fork.py", line 20, in __init__
    self._launch(process_obj)
  File "/anaconda3/lib/python3.7/multiprocessing/popen_forkserver.py", line 42, in _launch
    prep_data = spawn.get_preparation_data(process_obj._name)
  File "/anaconda3/lib/python3.7/multiprocessing/spawn.py", line 143, in get_preparation_data
    _check_not_importing_main()
  File "/anaconda3/lib/python3.7/multiprocessing/spawn.py", line 136, in _check_not_importing_main
    is not going to be frozen to produce an executable.''')
RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.
  

И:

 distributed.nanny - WARNING - Worker process 1844 exited with status 1
distributed.nanny - WARNING - Restarting worker
  

И:

 Traceback (most recent call last):
  File "/anaconda3/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
    send_bytes(obj)
  File "/anaconda3/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset   size])
  File "/anaconda3/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header   buf)
  File "/anaconda3/lib/python3.7/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
tornado.application - ERROR - Multiple exceptions in yield list
Traceback (most recent call last):
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 883, in callback
    result_list.append(f.result())
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1147, in run
    yielded = self.gen.send(value)
  File "/anaconda3/lib/python3.7/site-packages/distributed/deploy/local.py", line 217, in _start_worker
    raise gen.TimeoutError("Worker failed to start")
tornado.util.TimeoutError: Worker failed to start
tornado.application - ERROR - Multiple exceptions in yield list
Traceback (most recent call last):
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 883, in callback
    result_list.append(f.result())
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1147, in run
    yielded = self.gen.send(value)
  File "/anaconda3/lib/python3.7/site-packages/distributed/deploy/local.py", line 217, in _start_worker
  

Редактировать:
На основе ответа:
— Как мне предотвратить создание нового клиента при повторном запуске программы?
— Как я могу сделать следующее?

 def create_node_csv(df_node):
    return len(df_node)
  

Это возвращает мне следующую ошибку, связано ли это с параметром meta?

 ValueError: cannot reindex from a duplicate axis
  

Ответ №1:

При запуске скрипта Client() создаются новые рабочие элементы Dask, которые также получают копии переменных из исходного основного процесса. В некоторых случаях это включает повторный импорт скрипта в каждый worker’ов, каждый из которых, конечно, затем пытается создать Client новый набор процессов.

Лучший ответ, как и вообще со всем, что выполняется в процессах, — использовать функции и защищать основное выполнение. Ниже приведен способ сделать это без изменения структуры вашего одного скрипта:

 from datetime import datetime
import numpy as np
import os
from dask import dataframe as dd
from dask.distributed import Client
import pandas as pd

csv_path = 'chicago-complete.monthly.2018-07-01-to-2018-07-31/data.csv'
dir_destination = 'data'

def run():
    client = Client()

    df = dd.read_csv(csv_path, ...)
    if not os.path.exists(dir_destination):
            os.makedirs(dir_destination)

    def create_node_csv(df_node):
        # test function
        return len(df_node)

    res = df.groupby('node_id').apply(create_node_csv, meta=int)
    print(res.compute())

if __name__ == "__main__":
    run()
  

Как мне предотвратить создание нового клиента при повторном запуске программы?

В вызове Client() вы можете указать адрес существующего кластера, если знаете, что это будет. Кроме того, некоторые конкретные типы развертываний (их несколько) могут иметь понятие «текущий кластер».

Комментарии:

1. Спасибо! Я отредактировал приведенный выше код, добавив еще два коротких вопроса.