#python #generator
#python #генератор
Вопрос:
Я пишу код обработки данных, используя сопрограммы на основе генератора, где данные непрерывно считываются с датчиков или файлов (источник), затем преобразуются с использованием одной или нескольких функций (фильтров) и, наконец, записываются куда-либо (приемник). Вот пример использования некоторых фиктивных функций:
def coroutine(func):
def primed(*args, **kwargs):
coro = func(*args, **kwargs)
next(coro)
return coro
return primed
def source(name):
for x in range(1,4):
yield f'{name}{x}'
config = {
'filter1': '_F1',
'filter2': '_F2',
'filter3': '_F3',
}
@coroutine
def filter1(**config):
"""Coroutine that consumes AND produces"""
data = ''
while True:
data = yield data config['filter1']
@coroutine
def filter2(**config):
"""Coroutine that consumes AND produces"""
data = ''
while True:
data = yield data config['filter2']
@coroutine
def filter3(**config):
"""Coroutine that consumes AND produces"""
data = ''
while True:
data = yield data config['filter3']
@coroutine
def writer(where):
while True:
data = yield
print(f'writing to {where}: {data}')
def pipeline(source, transforms, sinks):
for data in source:
for f in transforms:
transformed = f(**config).send(data)
for sink in sinks:
sink.send(transformed)
pipeline(source('data'),
transforms=[
filter1,
filter2,
filter3,
],
sinks=[
writer('console'),
writer('file'),
])
Как правило, не рекомендуется смешивать поведение потребителя / производителя в сопрограммах (см. here. Однако этот подход позволяет мне писать pipeline
функцию без жесткого кодирования отдельных функций преобразования ( filter
). Если бы мне нужно было придерживаться «потребительского» поведения сопрограмм, это то, что я смог придумать:
def coroutine(func):
def primed(*args, **kwargs):
coro = func(*args, **kwargs)
next(coro)
return coro
return primed
def source(name, target):
for x in range(1,4):
target.send(f'{name}{x}')
config = {
'filter1': '_F1',
'filter2': '_F2',
'filter3': '_F3',
}
@coroutine
def filter1(target, **config):
"""This coroutine only consumes"""
while True:
data = yield
target.send(data config['filter1'])
@coroutine
def filter2(target, **config):
"""This coroutine only consumes"""
while True:
data = yield
target.send(data config['filter2'])
@coroutine
def filter3(target, **config):
"""This coroutine only consumes"""
while True:
data = yield
target.send(data config['filter3'])
@coroutine
def writer(where):
while True:
data = yield
print(f'writing to {where}: {data}')
def pipeline():
f3 = filter3(writer('console'), **config)
f2 = filter2(f3, **config)
f1 = filter1(f2, **config)
source('data', f1)
pipeline()
Теперь мой вопрос: является ли первая реализация необходимой плохой идеей, и если да, что может пойти не так? Мне это нравится больше, чем второй подход, хотя я понимаю, что я смешиваю поведение генератора / сопрограммы…
Комментарии:
1. Вы понимаете, что этому документу ~ десять лет? Мистер Бизли продолжил изучать эти концепции и адаптации с помощью с
asyncio
— у него есть несколько видеороликов по pyvideo.org2. Когда вы говорите, что вам нравится первая реализация, что вы имеете в виду под этим, это звучит довольно субъективно. И почему вам не нравится вторая версия? Пожалуйста, объясните, отредактировав свой вопрос.
3. @wwii Я знаю, что это довольно старая. Честно говоря, у меня возникают трудности с развитием интуиции относительно того, как
asyncio
я хочу написать код. Насколько я понимаю, вsend
нетasyncio
аналога, поэтому я предполагаю, что вам нужно использовать какой-тоasyncio.Queue
(???).4. @wwii Мне не нравится второе, потому что, как я упоминал, мне нужно жестко запрограммировать фильтры в
pipeline
функции (я не могу просто просмотреть список фильтров, как в первой версии). Более того, мне нужно начать с последнего фильтра, потому что этоtarget
для предыдущего фильтра.