#python #python-3.x #faust
#python #python-3.x #фауст
Вопрос:
Я пытаюсь перенаправлять сообщения на внутренние темы в faust. Как было предложено фаустом в этом примере:
https://faust.readthedocs.io/en/latest/playbooks/vskafka.html
У меня есть следующий код
in_topic = app.topic("in_topic", internal=False, partitions=1)
batching = app.topic("batching", internal=True, partitions=1)
....
@app.agent(in_topic)
async def process(stream):
async for event in stream:
event.forward(batching)
yield
Но я всегда получаю следующую ошибку при запуске моего pytest:
AttributeError: 'str' object has no attribute 'forward'
Была ли эта функция удалена, или мне нужно указать тему по-другому, чтобы получить событие, или это даже проблема с pytest?
Ответ №1:
Вы используете синтаксис в обратном направлении, вот почему!
in_topic = app.topic("in_topic", internal=False, partitions=1)
batching = app.topic("batching", internal=True, partitions=1)
....
@app.agent(in_topic)
async def process(stream):
async for event in stream:
batching.send(value=event)
yield
Должно действительно сработать.
Редактировать:
Это также единственное, с помощью чего я мог бы правильно использовать pytest с faust.
Метод sink можно использовать только в том случае, если вы удалите все, кроме последнего, sink из mocked agent, что не кажется интуитивно понятным.
Если вы сначала импортируете свой приемник, а затем добавляете этот декоратор в свой тест, все должно работать нормально:
from my_path import my_sink, my_agent
from unittest.mock import patch, AsyncMock
@patch(__name__ '.my_sink.send', new_callable=AsyncMock)
def test_my_agent(test_app)
async with my_agent.test_context() as agent:
payload = 'This_works_fine'
agent.put(payload)
my_sink.send.assert_called_with(value=payload)
Ответ №2:
попробуйте следующий:
@app.agent(in_topic, sink=[batching])
async def process(stream):
async for event in stream:
yield event
Ответ №3:
Ответ от @Florian Hall
работает нормально. Я также обнаружил, что метод пересылки реализован только для событий, в моем случае я получил str. Причиной может быть разница между темами и каналами Faust. Другое дело, что использование pytest и Faust test_context()
ведет себя странно, например, это заставляет вас уступать, даже если у вас не определен приемник.