Пользователь KafkaConsumer не может «Повторно подписаться»

#apache-kafka #python-3.8 #kafka-python

Вопрос:

Моя задача-написать несколько Python3-скриптов для тестирования системы apache-kafka. Поэтому из каждой пары тема-раздел следует прочитать одно сообщение. Подпрограмма подписывается на тему и переходит к случайному смещению в разделе темы, читает сообщение и отменяет подписку. Например, это означает, что она подписывается, отменяет подписку, а затем «повторно подписывается» на один SAMPLE_TOPIC, чтобы проверить разделы 0 и 1. Кажется, что шаг «повторная подписка» завершается неудачно, но я также столкнулся с тем, что первый шаг подписки завершается неудачно, что я исправил с помощью partitions_for_topic функции -.

Я знаю, что мог бы реализовать его без отмены и «повторной подписки», но, если возможно, я хочу, чтобы он работал так, как указано.

Итак, что же здесь происходит не так?
Я что-то упускаю? Или это связано с дизайном apache-kafka / kafka-python? Или это ошибка в одном из них? Или что-то еще?
Я ценю любую помощь.
Kafka-Python=2.0.2
Apache-Кафка=2.7.0 (Согласно kafka-topics --version )
Python3.8.5

Приведенный ниже код является уменьшенным вариантом оригинала, который выполняется в конвейере gitlab:

 from random import randint
from kafka.consumer.fetcher import ConsumerRecord
from kafka.structs import TopicPartition
from kafka import KafkaConsumer

def output_check(kafka: KafkaConsumer, topic_partitions: list, offsets: list):    
    tp: TopicPartition
    offset_dict: dict
    for tp, offset_dict in zip(topic_partitions, offsets):
        print("nOUTPUT_CHECK TOPIC_PARTITION:", tp)
        offset: int = randint(offset_dict["start"], offset_dict["end"])
        msg: ConsumerRecord = read_single_message(kafka, tp, offset)
        print()
        print(msg)

def read_single_message(kafka: KafkaConsumer, topic_partition: TopicPartition, offset: int):
    print("BEFORE SUBSCRIPTION - TOPIC:", topic_partition.topic)
    kafka.subscribe([(topic_partition.topic])
    print("nAFTER SUBSCRIBE - SUBSCRIPTION:", kafka.subscription())
    print("AFTER SUBSCRIBE - ASSIGNMENT:", kafka.assignment())
    print("AFTER SUBSCRIBE - PARTITIONS FOR TOPIC", kafka.partitions_for_topic(topic_partition.topic))

    assert topic_partition.topic in kafka.subscription(), 
        "The Consumer is not subscribed to the given Topic"   str(topic_partition.topic)   "."
    if <OFFSET CHECK>:
        raise ArithmeticError("Offset {} is not existing in {}".format(offset, topic_partition))
    else:
        print("nAFTER OFFSET CHECK - SUBSCRIPTION:", kafka.subscription())
        print("AFTER OFFSET CHECK - ASSIGNMENT:", kafka.assignment())
        kafka.seek(topic_partition, offset)

    kafka.poll(max_records=1)  # Poll Messages. Might be unnecessary...
    msg = None
    for msg in kafka:
        break
    kafka.unsubscribe()
    return msg
 

Изменено название темы и данные записи пользователя:

 OUTPUT_CHECK TOPIC_PARTITION: TopicPartition(topic='SAMPLE_TOPIC', partition=0)
BEFORE SUBSCRIPTION - TOPIC: SAMPLE_TOPIC

AFTER SUBSCRIBE - SUBSCRIPTION: {'SAMPLE_TOPIC'}
AFTER SUBSCRIBE - ASSIGNMENT: set()
AFTER SUBSCRIBE - PARTITIONS FOR TOPIC {0, 1, 2, 3, 4, 5, 6, 7}

AFTER OFFSET CHECK - SUBSCRIPTION: {'SAMPLE_TOPIC'}
AFTER OFFSET CHECK - ASSIGNMENT: {TopicPartition(topic='SAMPLE_TOPIC', partition=4),
    TopicPartition(topic='SAMPLE_TOPIC', partition=7), TopicPartition(topic='SAMPLE_TOPIC', partition=0),
    TopicPartition(topic='SAMPLE_TOPIC', partition=6), TopicPartition(topic='SAMPLE_TOPIC', partition=3),
    TopicPartition(topic='SAMPLE_TOPIC', partition=2), TopicPartition(topic='SAMPLE_TOPIC', partition=5),
    TopicPartition(topic='SAMPLE_TOPIC', partition=1)}

ConsumerRecord(topic='SAMPLE_TOPIC', partition=0, offset=1, timestamp=163345635146,
    timestamp_type=0, key=b'SOME_KEY', value=b'SOME AVRO ENCODED MESSAGE', headers=[],
    checksum=None, serialized_key_size=12, serialized_value_size=954, serialized_header_size=-1)

OUTPUT_CHECK TOPIC_PARTITION: TopicPartition(topic='SAMPLE_TOPIC', partition=1)
BEFORE SUBSCRIPTION - TOPIC: SAMPLE_TOPIC

AFTER SUBSCRIBE - SUBSCRIPTION: {'SAMPLE_TOPIC'}
AFTER SUBSCRIBE - ASSIGNMENT: set()
AFTER SUBSCRIBE - PARTITIONS FOR TOPIC {0, 1, 2, 3, 4, 5, 6, 7}

AFTER OFFSET CHECK - SUBSCRIPTION: {'SAMPLE_TOPIC'}
AFTER OFFSET CHECK - ASSIGNMENT: set()
Traceback (most recent call last):
  File "test_executor.py", line 88, in <module>
    main(argument_parsing())
  File "test_executor.py", line 58, in main
    BasicKafka.output_check(con_dict)
  File "...", line ..., in output_check
    msg: ConsumerRecord = read_single_message(kafka, tp, offset)
  File "...", line ..., in read_single_message
    kafka.seek(topic_partition, offset)
  File "/usr/lib/python3.8/site-packages/kafka/consumer/group.py", line 818, in seek
    assert partition in self._subscription.assigned_partitions(), 'Unassigned partition'
AssertionError: Unassigned partition