#dask #worker
#dask #рабочий
Вопрос:
Я пытаюсь использовать Dask для выполнения операции groupby над фреймом данных. Приведенный ниже код не работает, но кажется, что если я инициализирую клиент с другой консоли, код работает, хотя я ничего не вижу на панели мониторинга ( http://localhost:8787/status ): Я имею в виду, что есть панель мониторинга, но все цифры выглядят пустыми. Я нахожусь на macOS. Код:
from datetime import datetime
import numpy as np
import os
from dask import dataframe as dd
from dask.distributed import Client
import pandas as pd
client = Client()
# open http://localhost:8787/status
csv_path = 'chicago-complete.monthly.2018-07-01-to-2018-07-31/data.csv'
dir_destination = 'data'
df = dd.read_csv(csv_path,
dtype = {
'timestamp': str,
'node_id': str,
'subsystem': str,
'sensor': str,
'parameter': str,
'value_raw': str,
'value_hrf': str,
},
parse_dates=['timestamp'],
date_parser=lambda x: pd.datetime.strptime(x, '%Y/%m/%d %H:%M:%S')
)
#%%
if not os.path.exists(dir_destination):
os.makedirs(dir_destination)
def create_node_csv(df_node):
# test function
return len(df_node)
res = df.groupby('node_id').apply(create_node_csv, meta=int)
CSV-файл состоит просто из столбцов строки. Моя цель — сгруппировать все строки, содержащие определенное значение в столбце, а затем сохранить их как отдельный файл, используя create_node_csv(df_node) (хотя прямо сейчас это фиктивная функция). Приветствуется любой другой способ сделать это, но я хотел бы понять, что здесь происходит.
Когда я запускаю его, консоль несколько раз выводит следующие ошибки: tornado.application — ОШИБКА — Множественные исключения в обратном выводе списка (последний вызов last):
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 883, in callback
result_list.append(f.result())
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
yielded = self.gen.throw(*exc_info)
File "/anaconda3/lib/python3.7/site-packages/distributed/deploy/local.py", line 208, in _start_worker
yield w._start()
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
yielded = self.gen.throw(*exc_info)
File "/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 157, in _start
response = yield self.instantiate()
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
yielded = self.gen.throw(*exc_info)
File "/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 226, in instantiate
self.process.start()
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
yielded = self.gen.throw(*exc_info)
File "/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 370, in start
yield self.process.start()
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/anaconda3/lib/python3.7/site-packages/distributed/process.py", line 35, in _call_and_set_future
res = func(*args, **kwargs)
File "/anaconda3/lib/python3.7/site-packages/distributed/process.py", line 184, in _start
process.start()
File "/anaconda3/lib/python3.7/multiprocessing/process.py", line 112, in start
self._popen = self._Popen(self)
File "/anaconda3/lib/python3.7/multiprocessing/context.py", line 291, in _Popen
return Popen(process_obj)
File "/anaconda3/lib/python3.7/multiprocessing/popen_forkserver.py", line 35, in __init__
super().__init__(process_obj)
File "/anaconda3/lib/python3.7/multiprocessing/popen_fork.py", line 20, in __init__
self._launch(process_obj)
File "/anaconda3/lib/python3.7/multiprocessing/popen_forkserver.py", line 42, in _launch
prep_data = spawn.get_preparation_data(process_obj._name)
File "/anaconda3/lib/python3.7/multiprocessing/spawn.py", line 143, in get_preparation_data
_check_not_importing_main()
File "/anaconda3/lib/python3.7/multiprocessing/spawn.py", line 136, in _check_not_importing_main
is not going to be frozen to produce an executable.''')
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
И:
distributed.nanny - WARNING - Worker process 1844 exited with status 1
distributed.nanny - WARNING - Restarting worker
И:
Traceback (most recent call last):
File "/anaconda3/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
send_bytes(obj)
File "/anaconda3/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
self._send_bytes(m[offset:offset size])
File "/anaconda3/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
self._send(header buf)
File "/anaconda3/lib/python3.7/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
tornado.application - ERROR - Multiple exceptions in yield list
Traceback (most recent call last):
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 883, in callback
result_list.append(f.result())
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1147, in run
yielded = self.gen.send(value)
File "/anaconda3/lib/python3.7/site-packages/distributed/deploy/local.py", line 217, in _start_worker
raise gen.TimeoutError("Worker failed to start")
tornado.util.TimeoutError: Worker failed to start
tornado.application - ERROR - Multiple exceptions in yield list
Traceback (most recent call last):
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 883, in callback
result_list.append(f.result())
File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1147, in run
yielded = self.gen.send(value)
File "/anaconda3/lib/python3.7/site-packages/distributed/deploy/local.py", line 217, in _start_worker
Редактировать:
На основе ответа:
— Как мне предотвратить создание нового клиента при повторном запуске программы?
— Как я могу сделать следующее?
def create_node_csv(df_node):
return len(df_node)
Это возвращает мне следующую ошибку, связано ли это с параметром meta?
ValueError: cannot reindex from a duplicate axis
Ответ №1:
При запуске скрипта Client()
создаются новые рабочие элементы Dask, которые также получают копии переменных из исходного основного процесса. В некоторых случаях это включает повторный импорт скрипта в каждый worker’ов, каждый из которых, конечно, затем пытается создать Client
новый набор процессов.
Лучший ответ, как и вообще со всем, что выполняется в процессах, — использовать функции и защищать основное выполнение. Ниже приведен способ сделать это без изменения структуры вашего одного скрипта:
from datetime import datetime
import numpy as np
import os
from dask import dataframe as dd
from dask.distributed import Client
import pandas as pd
csv_path = 'chicago-complete.monthly.2018-07-01-to-2018-07-31/data.csv'
dir_destination = 'data'
def run():
client = Client()
df = dd.read_csv(csv_path, ...)
if not os.path.exists(dir_destination):
os.makedirs(dir_destination)
def create_node_csv(df_node):
# test function
return len(df_node)
res = df.groupby('node_id').apply(create_node_csv, meta=int)
print(res.compute())
if __name__ == "__main__":
run()
Как мне предотвратить создание нового клиента при повторном запуске программы?
В вызове Client()
вы можете указать адрес существующего кластера, если знаете, что это будет. Кроме того, некоторые конкретные типы развертываний (их несколько) могут иметь понятие «текущий кластер».
Комментарии:
1. Спасибо! Я отредактировал приведенный выше код, добавив еще два коротких вопроса.