#apache-kafka #python-3.8 #kafka-python
Вопрос:
Моя задача-написать несколько Python3-скриптов для тестирования системы apache-kafka. Поэтому из каждой пары тема-раздел следует прочитать одно сообщение. Подпрограмма подписывается на тему и переходит к случайному смещению в разделе темы, читает сообщение и отменяет подписку. Например, это означает, что она подписывается, отменяет подписку, а затем «повторно подписывается» на один SAMPLE_TOPIC, чтобы проверить разделы 0 и 1. Кажется, что шаг «повторная подписка» завершается неудачно, но я также столкнулся с тем, что первый шаг подписки завершается неудачно, что я исправил с помощью partitions_for_topic
функции -.
Я знаю, что мог бы реализовать его без отмены и «повторной подписки», но, если возможно, я хочу, чтобы он работал так, как указано.
Итак, что же здесь происходит не так?
Я что-то упускаю? Или это связано с дизайном apache-kafka / kafka-python? Или это ошибка в одном из них? Или что-то еще?
Я ценю любую помощь.
Kafka-Python=2.0.2
Apache-Кафка=2.7.0 (Согласно kafka-topics --version
)
Python3.8.5
Приведенный ниже код является уменьшенным вариантом оригинала, который выполняется в конвейере gitlab:
from random import randint
from kafka.consumer.fetcher import ConsumerRecord
from kafka.structs import TopicPartition
from kafka import KafkaConsumer
def output_check(kafka: KafkaConsumer, topic_partitions: list, offsets: list):
tp: TopicPartition
offset_dict: dict
for tp, offset_dict in zip(topic_partitions, offsets):
print("nOUTPUT_CHECK TOPIC_PARTITION:", tp)
offset: int = randint(offset_dict["start"], offset_dict["end"])
msg: ConsumerRecord = read_single_message(kafka, tp, offset)
print()
print(msg)
def read_single_message(kafka: KafkaConsumer, topic_partition: TopicPartition, offset: int):
print("BEFORE SUBSCRIPTION - TOPIC:", topic_partition.topic)
kafka.subscribe([(topic_partition.topic])
print("nAFTER SUBSCRIBE - SUBSCRIPTION:", kafka.subscription())
print("AFTER SUBSCRIBE - ASSIGNMENT:", kafka.assignment())
print("AFTER SUBSCRIBE - PARTITIONS FOR TOPIC", kafka.partitions_for_topic(topic_partition.topic))
assert topic_partition.topic in kafka.subscription(),
"The Consumer is not subscribed to the given Topic" str(topic_partition.topic) "."
if <OFFSET CHECK>:
raise ArithmeticError("Offset {} is not existing in {}".format(offset, topic_partition))
else:
print("nAFTER OFFSET CHECK - SUBSCRIPTION:", kafka.subscription())
print("AFTER OFFSET CHECK - ASSIGNMENT:", kafka.assignment())
kafka.seek(topic_partition, offset)
kafka.poll(max_records=1) # Poll Messages. Might be unnecessary...
msg = None
for msg in kafka:
break
kafka.unsubscribe()
return msg
Изменено название темы и данные записи пользователя:
OUTPUT_CHECK TOPIC_PARTITION: TopicPartition(topic='SAMPLE_TOPIC', partition=0)
BEFORE SUBSCRIPTION - TOPIC: SAMPLE_TOPIC
AFTER SUBSCRIBE - SUBSCRIPTION: {'SAMPLE_TOPIC'}
AFTER SUBSCRIBE - ASSIGNMENT: set()
AFTER SUBSCRIBE - PARTITIONS FOR TOPIC {0, 1, 2, 3, 4, 5, 6, 7}
AFTER OFFSET CHECK - SUBSCRIPTION: {'SAMPLE_TOPIC'}
AFTER OFFSET CHECK - ASSIGNMENT: {TopicPartition(topic='SAMPLE_TOPIC', partition=4),
TopicPartition(topic='SAMPLE_TOPIC', partition=7), TopicPartition(topic='SAMPLE_TOPIC', partition=0),
TopicPartition(topic='SAMPLE_TOPIC', partition=6), TopicPartition(topic='SAMPLE_TOPIC', partition=3),
TopicPartition(topic='SAMPLE_TOPIC', partition=2), TopicPartition(topic='SAMPLE_TOPIC', partition=5),
TopicPartition(topic='SAMPLE_TOPIC', partition=1)}
ConsumerRecord(topic='SAMPLE_TOPIC', partition=0, offset=1, timestamp=163345635146,
timestamp_type=0, key=b'SOME_KEY', value=b'SOME AVRO ENCODED MESSAGE', headers=[],
checksum=None, serialized_key_size=12, serialized_value_size=954, serialized_header_size=-1)
OUTPUT_CHECK TOPIC_PARTITION: TopicPartition(topic='SAMPLE_TOPIC', partition=1)
BEFORE SUBSCRIPTION - TOPIC: SAMPLE_TOPIC
AFTER SUBSCRIBE - SUBSCRIPTION: {'SAMPLE_TOPIC'}
AFTER SUBSCRIBE - ASSIGNMENT: set()
AFTER SUBSCRIBE - PARTITIONS FOR TOPIC {0, 1, 2, 3, 4, 5, 6, 7}
AFTER OFFSET CHECK - SUBSCRIPTION: {'SAMPLE_TOPIC'}
AFTER OFFSET CHECK - ASSIGNMENT: set()
Traceback (most recent call last):
File "test_executor.py", line 88, in <module>
main(argument_parsing())
File "test_executor.py", line 58, in main
BasicKafka.output_check(con_dict)
File "...", line ..., in output_check
msg: ConsumerRecord = read_single_message(kafka, tp, offset)
File "...", line ..., in read_single_message
kafka.seek(topic_partition, offset)
File "/usr/lib/python3.8/site-packages/kafka/consumer/group.py", line 818, in seek
assert partition in self._subscription.assigned_partitions(), 'Unassigned partition'
AssertionError: Unassigned partition