Обязательно ли плохая идея смешивать поведение потребления / создания сопрограмм генератора?

#python #generator

#python #генератор

Вопрос:

Я пишу код обработки данных, используя сопрограммы на основе генератора, где данные непрерывно считываются с датчиков или файлов (источник), затем преобразуются с использованием одной или нескольких функций (фильтров) и, наконец, записываются куда-либо (приемник). Вот пример использования некоторых фиктивных функций:

 def coroutine(func):
    def primed(*args, **kwargs):
        coro = func(*args, **kwargs)
        next(coro)
        return coro
    return primed

def source(name):
    for x in range(1,4):
        yield f'{name}{x}'

config = {
    'filter1': '_F1',
    'filter2': '_F2',
    'filter3': '_F3',
}

@coroutine
def filter1(**config):
    """Coroutine that consumes AND produces"""
    data = ''
    while True:
        data = yield data   config['filter1']

@coroutine
def filter2(**config):
    """Coroutine that consumes AND produces"""
    data = ''
    while True:
        data = yield data   config['filter2']

@coroutine
def filter3(**config):
    """Coroutine that consumes AND produces"""
    data = ''
    while True:
        data = yield data   config['filter3']

@coroutine
def writer(where):
    while True:
        data = yield
        print(f'writing to {where}: {data}')

def pipeline(source, transforms, sinks):
    for data in source:
        for f in transforms:
            transformed = f(**config).send(data)
        for sink in sinks:
            sink.send(transformed)


pipeline(source('data'), 
         transforms=[
             filter1,
             filter2,
             filter3,
         ], 
         sinks=[
             writer('console'),
             writer('file'),
         ])
  

Как правило, не рекомендуется смешивать поведение потребителя / производителя в сопрограммах (см. here. Однако этот подход позволяет мне писать pipeline функцию без жесткого кодирования отдельных функций преобразования ( filter ). Если бы мне нужно было придерживаться «потребительского» поведения сопрограмм, это то, что я смог придумать:

 def coroutine(func):
    def primed(*args, **kwargs):
        coro = func(*args, **kwargs)
        next(coro)
        return coro
    return primed

def source(name, target):
    for x in range(1,4):
        target.send(f'{name}{x}')

config = {
    'filter1': '_F1',
    'filter2': '_F2',
    'filter3': '_F3',
}

@coroutine
def filter1(target, **config):
    """This coroutine only consumes"""
    while True:
        data = yield 
        target.send(data   config['filter1'])

@coroutine
def filter2(target, **config):
    """This coroutine only consumes"""
    while True:
        data = yield 
        target.send(data   config['filter2'])

@coroutine
def filter3(target, **config):
    """This coroutine only consumes"""
    while True:
        data = yield 
        target.send(data   config['filter3'])

@coroutine
def writer(where):
    while True:
        data = yield
        print(f'writing to {where}: {data}')

def pipeline():
    f3 = filter3(writer('console'), **config)
    f2 = filter2(f3, **config)        
    f1 = filter1(f2, **config)

    source('data', f1)

pipeline()
  

Теперь мой вопрос: является ли первая реализация необходимой плохой идеей, и если да, что может пойти не так? Мне это нравится больше, чем второй подход, хотя я понимаю, что я смешиваю поведение генератора / сопрограммы…

Комментарии:

1. Вы понимаете, что этому документу ~ десять лет? Мистер Бизли продолжил изучать эти концепции и адаптации с помощью с asyncio — у него есть несколько видеороликов по pyvideo.org

2. Когда вы говорите, что вам нравится первая реализация, что вы имеете в виду под этим, это звучит довольно субъективно. И почему вам не нравится вторая версия? Пожалуйста, объясните, отредактировав свой вопрос.

3. @wwii Я знаю, что это довольно старая. Честно говоря, у меня возникают трудности с развитием интуиции относительно того, как asyncio я хочу написать код. Насколько я понимаю, в send нет asyncio аналога, поэтому я предполагаю, что вам нужно использовать какой-то asyncio.Queue (???).

4. @wwii Мне не нравится второе, потому что, как я упоминал, мне нужно жестко запрограммировать фильтры в pipeline функции (я не могу просто просмотреть список фильтров, как в первой версии). Более того, мне нужно начать с последнего фильтра, потому что это target для предыдущего фильтра.