#python #apache-kafka #faust
Вопрос:
у меня возникли некоторые проблемы с выполнением простой задачи с фаустом python, пожалуйста, взгляните на проблему и посмотрите, можете ли вы мне помочь.
Шаги по воспроизведению
я использовал этот код:
import faust
from settings import KAFKA_SERVER
app = faust.App('streams', broker=KAFKA_SERVER, producer_acks=0, store='rocksdb://')
class ProjetoRateio(faust.Record):
codigoProjeto: str
combinacao: str
grade: str
quantidade: int
projeto_rateio_topic = app.topic(
'gua-poc-sent-rateio',
# key_type=str,
value_type=ProjetoRateio,
# value_serializer='raw',
)
grade_total = app.Table('grade_total', default=int,
partitions=1)
@app.agent(projeto_rateio_topic)
async def projeto_rateio(rateios):
async for rateio in rateios:
# grade_total[f'{rateio.codigoProjeto}.{rateio.combinacao}.{rateio.grade}'] = rateio.quantidade
grade_total[rateio.codigoProjeto] = rateio.quantidade
и получил ошибку, описанную в названии
Ожидаемое поведение
Таблица кафки, заполненная
Фактическое поведение
Exception in callback Topic._on_published(message=<FutureMessag...d result=None>, state={<Monitor: running >: 7442.2543931}, producer=<Producer: running >)(<Future finished result=None>)
handle: <Handle Topic._on_published(message=<FutureMessag...d result=None>, state={<Monitor: running >: 7442.2543931}, producer=<Producer: running >)(<Future finished result=None>)>
Traceback (most recent call last):
File "/usr/lib/python3.8/asyncio/events.py", line 81, in _run
self._context.run(self._callback, *self._args)
File "/home/jhon/.cache/pypoetry/virtualenvs/gua-kafka-stream-faust-poc-VnS5Y2j1-py3.8/lib/python3.8/site-packages/faust/topics.py", line 474, in _on_published
message.message.callback(message)
File "/home/jhon/.cache/pypoetry/virtualenvs/gua-kafka-stream-faust-poc-VnS5Y2j1-py3.8/lib/python3.8/site-packages/faust/tables/base.py", line 353, in _on_changelog_sent
self.data.set_persisted_offset(res.topic_partition, res.offset)
AttributeError: 'NoneType' object has no attribute 'topic_partition'
Версии
- Python версии 3.8
- Фауст-потоковая версия 0.6.4
- Операционная система windows 10, Ubuntu 20.04 (через wsl)
- Кафка версия 2.0.0 (msk)
- Версия RocksDB: 5.0
Я делаю что-то не так?
Ответ №1:
Имея аналогичную проблему, проблемная конфигурация заключается producer_acks=0
в том , что попробуйте установить ее либо 1
или -1
в зависимости от желаемого поведения:
Количество подтверждений, которые производитель требует, чтобы руководитель получил, прежде чем считать запрос завершенным. Это контролирует долговечность отправляемых записей. Общими являются следующие настройки:
0
: Производитель вообще не будет ждать никаких подтверждений от сервера. Сообщение будет немедленно считаться отправленным (не рекомендуется).1
: Лидер брокера запишет запись в свой локальный журнал, но ответит, не дожидаясь полного подтверждения от всех подписчиков. В этом случае, если лидер потерпит неудачу сразу после подтверждения записи, но до того, как последователи ее воспроизведут, запись будет потеряна.-1
: Руководитель брокера будет ждать, пока полный набор синхронизированных реплик подтвердит запись. Это гарантирует, что запись не будет потеряна до тех пор, пока по крайней мере одна синхронизированная реплика остается живой. Это самая сильная из доступных гарантий.
Этот блок был взят с github, транслирующего фауста, здесь.