#python #bigdata #dask
#python #bigdata #dask
Вопрос:
У меня есть приведенный ниже код. Он использует распределенный dask для чтения 100 файлов json: (Рабочие: 5 ядер: 5 Память: 50,00 ГБ)
from dask.distributed import Client
import dask.dataframe as dd
client = Client('xxxxxxxx:8786')
df = dd.read_json('gs://xxxxxx/2018-04-18/data-*.json')
df = client.persist(df)
Когда я запускаю код, я вижу, что только один рабочий выполняет задачу read_json(), а затем я получил ошибку памяти и получил ошибку WorkerKilled.
Должен ли я вручную читать каждый файл и объединять их? или предполагается, что dask делает это под капотом?
Комментарии:
1. @MRocklin очень признателен, если у вас есть какие-либо идеи!
Ответ №1:
Возможно, вы захотите использовать dask.bag вместо dask.dataframe
import json
import dask.bag as db
mybag = db.read_text('gs://xxxxxx/2018-04-18/data-*.json').map(json.loads)
После этого вы можете преобразовать пакет в фрейм данных dask с помощью
mybag.to_dataframe()
Для правильного построения структуры может потребоваться некоторое дополнительное использование dask.map.
Если ваши данные представляют собой json в стиле hadoop (он же один объект на строку), трюк с сумкой все равно будет работать, но вам может потребоваться работать с отдельными строками.
Комментарии:
1. пакет dask работает довольно быстро! но я попробовал проверить, насколько быстро можно сохранить пакет обратно в gcs с помощью mybag.to_textfiles(‘gs://xxxxxx/2018-04-18/output/data* .json.gz ‘) но я сразу же получил отмененную ошибку! это ошибка dask?