Распараллеливание фильтрации списков

#python #dask #dask-delayed

Вопрос:

У меня есть список элементов, которые мне нужно отфильтровать на основе некоторых условий. Мне интересно, сможет ли Dask выполнять эту фильтрацию параллельно, так как список очень длинный (несколько десятков миллионов записей).

В принципе, то, что мне нужно сделать, это:

 items = [
    {'type': 'dog', 'weight': 10},
    {'type': 'dog', 'weight': 20},
    {'type': 'cat', 'weight': 15},
    {'type': 'dog', 'weight': 30},
]

def item_is_valid(item):
    item_is_valid = True

    if item['type']=='cat':
        item_is_valid = False
    elif item['weight']>20:
        item_is_valid = False
    # ...
    # elif for n conditions

    return item_is_valid

items_filtered = [item for item in items if item_is_valid(item)]

 

С Даском я добился следующего::

 def item_is_valid_v2(item):
    """Return the whole item if valid."""
    item_is_valid = True

    if item['type']=='cat':
        item_is_valid = False
    elif item['weight']>20:
        item_is_valid = False
    # ...
    # elif for n conditions
    
    if item_is_valid:
        return item

results = []
item = []
for item in items:
    delayed = dask.delayed(item_is_valid)(item)
    results.append(delayed)

results = dask.compute(*results)
 

Однако результат, который я получаю, содержит несколько None значений, которые затем нужно как-то отфильтровать непараллельным способом.

 ({'type': 'dog', 'weight': 10}, {'type': 'dog', 'weight': 20}, None, None)
 

Комментарии:

1.Вы пробовали проверить, есть ли значение, None прежде чем добавлять его в список? Есть ли способ сделать это?

2. Элементы лениво добавляются через Dask, и один раз во время вычисления все происходит внутри Dask, поэтому я не думаю, что вы можете выбирать, какие элементы добавляются. Если это возможно, я не знаю, как бы вы это сделали.

3. Как быстро вам нужно быть, и что именно находится в вашем состоянии ? фильтрация 48 миллионов записей с таким условием занимает всего 6,7 секунды (что кажется довольно быстрым). Если ваше условие заключается в многократном доступе к одним и тем же ключам словаря, вы можете ускорить его, поместив значения в переменные. Вы также можете получить небольшой прирост скорости, используя фильтр вместо понимания списка: result = [*filter(item_is_valid,items)]

Ответ №1:

Возможно bag , API вас устроит, это грубый псевдокод:

 import dask.bag as db

bag = db.from_sequence() # or better yet read it from disk
result = bag.filter(item_is_valid) # note this uses the first version (bool)
 

Чтобы проверить, работает ли это, проверьте результат result.take(5) и если это удовлетворительно:

 computed_result = result.compute()
 

Комментарии:

1. Именно то, что мне было нужно, спасибо!! Я верю bag.from_iterable , что его не существует. Кстати, я верю , что это так bag.from_sequence , не стесняйтесь менять это, чтобы другие люди не путались. Но еще раз спасибо за решение!

2. Ha! Память играет со мной злую шутку, спасибо, я обновлю ответ.