Dask: автоматическая повторная проверка и block_size_limit

#python #arrays #numpy #dask

Вопрос:

Мне нужна помощь в понимании того , как rechunk это делается, auto и block_size_limit в работе.

Я пытался использовать Dask для вычисления некоторых больших тензорных внешних продуктов, но у меня возникли проблемы, потому что процессы требовали много памяти. Читая документы, я нашел это. Итак, чтобы посмотреть, как это работает, я открыл блокнот Jupyter и написал:

 import dask.array as da
import numpy as np

a = da.random.random((30, 40, 30))
z = da.blockwise(np.multiply.outer, 'ijklmn', a, 'ijk', a, 'lmn', dtype='f8')
 

Я вижу, что z изначально 1 был кусок размером 10.37GB .

Важно отметить , что 1e8 соответствует 100MB , 1e9 соответствует 1GB и 1e10 соответствует 10GB .

Поэтому я решил перепроверить его и посмотреть, что произошло. Я выполнил эти команды по порядку:

 z = z.rechunk(chunks='auto', block_size_limit='1e8') # --> 256 chunks of 40.50MB.
z = z.rechunk(chunks='auto', block_size_limit='1e9') # --> 256 chunks of 40.50MB.
z = z.rechunk(chunks='auto', block_size_limit='1e10') # --> 2 chunks of 7.78 GB.
 

Затем я снова создал массив и выполнил следующее:

 z = da.blockwise(np.multiply.outer, 'ijklmn', a, 'ijk', a, 'lmn', dtype='f8')
z = z.rechunk(chunks='auto', block_size_limit='1e8') # --> 256 chunks of 40.50MB.
z = da.blockwise(np.multiply.outer, 'ijklmn', a, 'ijk', a, 'lmn', dtype='f8')
z = z.rechunk(chunks='auto', block_size_limit='1e9') # --> 64 chunks of 162.00 MB
z = da.blockwise(np.multiply.outer, 'ijklmn', a, 'ijk', a, 'lmn', dtype='f8')
z = z.rechunk(chunks='auto', block_size_limit='1e10') # --> 64 chunks of 162.00 MB
 

В моем коде у меня есть список тензоров, и я вызываю внешний продукт в цикле:

 # This is a simplification of my code.

def get_blockwise_product(array):
    return da.blockwise(np.multiply.outer, 'ijklmn', array, 'ijk', array, 'lmn', dtype='f8')

result = da.zeros(output_size)
for array in arrays:
    outer = get_blockwise_product(array)
    # Here I rechunk outer because I can get some large results
    outer = outer.rechunk(chunks='auto', block_size_limit=1e9)
    result  = outer
 

Так что мой случай был бы похож на второй блок, где вы получаете больше кусков меньшего размера. Проблема, с которой я столкнулся сейчас, заключается в том, что я получаю очень большие графики задач ( ~200,000 tasks ), и через некоторое время программа выходит из строя (даже если я установил большой timeout ).

Единственная разница в том, что во втором блоке команд я снова вызываю блок перед повторной проверкой. Почему результаты так сильно отличаются? Является ли это ожидаемым поведением?

Я также попытался установить это значение dask.config.set({"array.chunk-size": '1GB'}) , и я мог видеть, как оно обновляется dask.config.config . Однако поведение не изменилось.