spark streaming другой декодер значений для каждой темы kafka

#python #apache-spark #apache-kafka #avro

#питон #apache-spark #apache-kafka #avro #python

Вопрос:

Мне нужно создать потоковую передачу Spark, которая считывает данные из нескольких тем и использует разные декодеры для каждой темы (каждая тема содержит разные объекты с кодировкой avro):

 def decode_avro(message):
    schem = avro.schema.parse(open("error_list.avsc").read())
    bytes_reader = io.BytesIO(message)
    decoder = avro.io.BinaryDecoder(bytes_reader)
    reader = avro.io.DatumReader(schem)
    return reader.read(decoder)

ssc = StreamingContext(sc, 2)
kvs = KafkaUtils.createDirectStream(ssc, [topic, topic2], {
    "metadata.broker.list": brokers}, valueDecoder = decode_avro)
  

Я не хочу знать, можно ли указать разные обратные вызовы декодера для каждой темы или можно ли узнать название темы в функции декодера (таким образом, я мог бы использовать название темы для файла схемы avro и декодировать все сообщения в одной функции)

Спасибо

Комментарии:

1. Я сталкиваюсь с той же проблемой. Я вижу, что этому вопросу более 1 года. Как вы обошли это препятствие?

2. Мы, наконец, не использовали этот подход (даже не используя Kafka вообще на данный момент). Я думаю о системе try / catch, которая переходит к следующему декодеру, если возникает одно исключение. Это уродливое решение, но я не нашел лучшего!

3. Хорошо, спасибо за обновление. Я нашел правильное решение для этого, поэтому я добавлю его в качестве ответа здесь.

Ответ №1:

У нас также есть случай, когда мы читаем из разных тем с разными форматами сообщений, а затем обрабатываем каждую тему и сохраняем выходные данные в выделенном хранилище для каждой исходной темы. Правильный путь здесь — создать несколько потоков. Поток для каждой темы, в том же приложении, с тем же контекстом Spark. Каждый поток получит соответствующий ValueDecoder, и вы все равно можете читать из нескольких тем, если они имеют один и тот же формат.

Комментарии:

1. Спасибо за ваш ответ. Я проголосую за это, и когда у меня будет время, я его протестирую.