#python #dask #dask-delayed
Вопрос:
У меня есть список элементов, которые мне нужно отфильтровать на основе некоторых условий. Мне интересно, сможет ли Dask выполнять эту фильтрацию параллельно, так как список очень длинный (несколько десятков миллионов записей).
В принципе, то, что мне нужно сделать, это:
items = [
{'type': 'dog', 'weight': 10},
{'type': 'dog', 'weight': 20},
{'type': 'cat', 'weight': 15},
{'type': 'dog', 'weight': 30},
]
def item_is_valid(item):
item_is_valid = True
if item['type']=='cat':
item_is_valid = False
elif item['weight']>20:
item_is_valid = False
# ...
# elif for n conditions
return item_is_valid
items_filtered = [item for item in items if item_is_valid(item)]
С Даском я добился следующего::
def item_is_valid_v2(item):
"""Return the whole item if valid."""
item_is_valid = True
if item['type']=='cat':
item_is_valid = False
elif item['weight']>20:
item_is_valid = False
# ...
# elif for n conditions
if item_is_valid:
return item
results = []
item = []
for item in items:
delayed = dask.delayed(item_is_valid)(item)
results.append(delayed)
results = dask.compute(*results)
Однако результат, который я получаю, содержит несколько None
значений, которые затем нужно как-то отфильтровать непараллельным способом.
({'type': 'dog', 'weight': 10}, {'type': 'dog', 'weight': 20}, None, None)
Комментарии:
1.Вы пробовали проверить, есть ли значение,
None
прежде чем добавлять его в список? Есть ли способ сделать это?2. Элементы лениво добавляются через Dask, и один раз во время вычисления все происходит внутри Dask, поэтому я не думаю, что вы можете выбирать, какие элементы добавляются. Если это возможно, я не знаю, как бы вы это сделали.
3. Как быстро вам нужно быть, и что именно находится в вашем состоянии ? фильтрация 48 миллионов записей с таким условием занимает всего 6,7 секунды (что кажется довольно быстрым). Если ваше условие заключается в многократном доступе к одним и тем же ключам словаря, вы можете ускорить его, поместив значения в переменные. Вы также можете получить небольшой прирост скорости, используя фильтр вместо понимания списка:
result = [*filter(item_is_valid,items)]
Ответ №1:
Возможно bag
, API вас устроит, это грубый псевдокод:
import dask.bag as db
bag = db.from_sequence() # or better yet read it from disk
result = bag.filter(item_is_valid) # note this uses the first version (bool)
Чтобы проверить, работает ли это, проверьте результат result.take(5)
и если это удовлетворительно:
computed_result = result.compute()
Комментарии:
1. Именно то, что мне было нужно, спасибо!! Я верю
bag.from_iterable
, что его не существует. Кстати, я верю , что это такbag.from_sequence
, не стесняйтесь менять это, чтобы другие люди не путались. Но еще раз спасибо за решение!2. Ha! Память играет со мной злую шутку, спасибо, я обновлю ответ.