Read_json() dask параллелен?

#python #bigdata #dask

#python #bigdata #dask

Вопрос:

У меня есть приведенный ниже код. Он использует распределенный dask для чтения 100 файлов json: (Рабочие: 5 ядер: 5 Память: 50,00 ГБ)

   from dask.distributed import Client
  import dask.dataframe as dd

  client = Client('xxxxxxxx:8786')
  df = dd.read_json('gs://xxxxxx/2018-04-18/data-*.json')
  df = client.persist(df)
 

Когда я запускаю код, я вижу, что только один рабочий выполняет задачу read_json(), а затем я получил ошибку памяти и получил ошибку WorkerKilled.

Должен ли я вручную читать каждый файл и объединять их? или предполагается, что dask делает это под капотом?

Комментарии:

1. @MRocklin очень признателен, если у вас есть какие-либо идеи!

Ответ №1:

Возможно, вы захотите использовать dask.bag вместо dask.dataframe

 import json
import dask.bag as db
mybag = db.read_text('gs://xxxxxx/2018-04-18/data-*.json').map(json.loads)
 

После этого вы можете преобразовать пакет в фрейм данных dask с помощью

 mybag.to_dataframe()
 

Для правильного построения структуры может потребоваться некоторое дополнительное использование dask.map.

Если ваши данные представляют собой json в стиле hadoop (он же один объект на строку), трюк с сумкой все равно будет работать, но вам может потребоваться работать с отдельными строками.

Комментарии:

1. пакет dask работает довольно быстро! но я попробовал проверить, насколько быстро можно сохранить пакет обратно в gcs с помощью mybag.to_textfiles(‘gs://xxxxxx/2018-04-18/output/data* .json.gz ‘) но я сразу же получил отмененную ошибку! это ошибка dask?