Пересылка событий с помощью Python Faust

#python #python-3.x #faust

#python #python-3.x #фауст

Вопрос:

Я пытаюсь перенаправлять сообщения на внутренние темы в faust. Как было предложено фаустом в этом примере:

https://faust.readthedocs.io/en/latest/playbooks/vskafka.html

У меня есть следующий код

 in_topic = app.topic("in_topic", internal=False, partitions=1)
batching = app.topic("batching", internal=True, partitions=1)

....

@app.agent(in_topic)
async def process(stream):
    async for event in stream:
        event.forward(batching)
        yield
 

Но я всегда получаю следующую ошибку при запуске моего pytest:

  AttributeError: 'str' object has no attribute 'forward'
 

Была ли эта функция удалена, или мне нужно указать тему по-другому, чтобы получить событие, или это даже проблема с pytest?

Ответ №1:

Вы используете синтаксис в обратном направлении, вот почему!

 in_topic = app.topic("in_topic", internal=False, partitions=1)
batching = app.topic("batching", internal=True, partitions=1)

....

@app.agent(in_topic)
async def process(stream):
    async for event in stream:
        batching.send(value=event)
        yield
 

Должно действительно сработать.

Редактировать:

Это также единственное, с помощью чего я мог бы правильно использовать pytest с faust.
Метод sink можно использовать только в том случае, если вы удалите все, кроме последнего, sink из mocked agent, что не кажется интуитивно понятным.

Если вы сначала импортируете свой приемник, а затем добавляете этот декоратор в свой тест, все должно работать нормально:

 from my_path import my_sink, my_agent
from unittest.mock import patch, AsyncMock

@patch(__name__   '.my_sink.send', new_callable=AsyncMock)
def test_my_agent(test_app)
    async with my_agent.test_context() as agent:
        payload = 'This_works_fine'
        agent.put(payload)
        my_sink.send.assert_called_with(value=payload)
 

Ответ №2:

попробуйте следующий:

 @app.agent(in_topic, sink=[batching])
async def process(stream):
    async for event in stream:
        yield event
 

Ответ №3:

Ответ от @Florian Hall работает нормально. Я также обнаружил, что метод пересылки реализован только для событий, в моем случае я получил str. Причиной может быть разница между темами и каналами Faust. Другое дело, что использование pytest и Faust test_context() ведет себя странно, например, это заставляет вас уступать, даже если у вас не определен приемник.