Десериализовать сообщение Kafka json с помощью потоковой передачи PySpark

#python #json #apache-spark #deserialization #spark-streaming-kafka

#python #json #apache-spark #десериализация #spark-streaming-кафка

Вопрос:

У меня есть приложение pyspark, которое использует сообщения из раздела Kafka, эти сообщения сериализуются с помощью org.apache.kafka.connect.json.JsonConverter . Для этого я использую соединительный разъем Kafka JDBC

Проблема в том, что когда я использую сообщения, столбец ID появляется в каком-то закодированном тексте, таком как «ARM =», когда это должен быть тип number.

Вот код, который у меня есть сейчас

 spark = SparkSession.builder.appName("my app").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('WARN')
ssc = StreamingContext(sc, 5)

kafka_params = {
    "bootstrap.servers": "kafkahost:9092",
    "group.id": "Deserialize"
}

kafka_stream = KafkaUtils.createDirectStream(ssc, ['mytopic'], kafka_params)
kafka_stream.foreachRDD(lambda rdd: rdd.foreach(lambda x: print(x))

ssc.start()
ssc.awaitTermination()
  

Я знаю, что createDirectStream имеет параметр valueDecoder, который я могу установить, проблема в том, что я не знаю, как использовать это для декодирования. Я также заранее ознакомлен со схемой, поэтому смогу создать ее, если потребуется.

Для справки, это JSON, который я получаю при распечатке rdd.foreach

 {
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "bytes",
        "optional": False,
        "name": "org.apache.kafka.connect.data.Decimal",
        "version": 1,
        "parameters": {
          "scale": "0"
        },
        "field": "ID"
      },
      {
        "type": "string",
        "optional": True,
        "field": "COLUMN1"
      }
    ],
    "optional": False
  },
  "payload": {
    "ID": "AOo=",
    "COLUMN1": "some string"
  }
}
  

Комментарии:

1. "optional": False, недопустимый JSON, по крайней мере, согласно jsonlint.com ‘T’ в True и ‘F’ в False должны быть в нижнем регистре (см. json-schema.org/understanding-json-schema/reference /… )

Ответ №1:

Итак, как упоминал cricket_007, в вашей конфигурации confluent Kafka вы должны установить параметр следующим образом value.converter.schema.enable=false . Это избавит вас от поля схемы и оставит вам только полезную нагрузку json. Теперь по какой-то причине у меня возникла проблема, из-за которой все мои числовые столбцы кодировались в этом странном формате AOo= . Теперь при использовании Json для сериализации ваших данных confluent преобразует ваши числовые столбцы с использованием base64, но реальная проблема возникла еще до этого. По какой-то причине все мои числовые столбцы преобразовывались в байты. Не уверен точно, почему он это делает, но это как-то связано с тем, как confluent обрабатывает базы данных Oracle. В любом случае, способ исправить это — установить декодер значений в вашем createDirectStream , такой как

 kafka_stream = KafkaUtils.createDirectStream(ssc, ['mytopic'], kafka_params, valueDecoder=decoder)
  

и в вашем методе декодирования вам нужно декодировать ваше сообщение из UTF-8, проанализировать json, а затем декодировать ваш столбец number из base64, а затем из байтов примерно так

 def decoder(s):
    if s is None:
        return None

    loaded_json = json.loads(s.decode('utf-8'))
    loaded_json["ID"] = int.from_bytes(base64.b64decode(loaded_json['ID']), "big")
    return loaded_json
  

Комментарии:

1. Примечание: json = json.loads переопределяет импортированное определение модуля. Если бы вы попытались использовать модуль снова, вы бы получили сообщение об ошибке

2. Ах, спасибо! В моем коде этого нет, но я изменю это в ответе, чтобы никто не запутался

Ответ №2:

В вашей конфигурации подключения вы можете установить value.converter.schema.enable=false , и тогда вы получите только данные «полезной нагрузки» этой записи JSON.

Я предполагаю, что оттуда вы сможете обработать сообщение в соответствии с любым другим примером чтения потокового JSON в PySpark.

В противном случае, поскольку вы не используете структурированную потоковую передачу, вам не нужно определять схему. Скорее вам пришлось бы, по крайней мере, сделать что-то вроде so, чтобы просто проанализировать записи

 rdd.map(lambda x: json.loads(x))
    .map(lambda x: x['payload'])
    .foreach(lambda x: print(x))
  

Комментарии:

1. Спасибо вам за это, я действительно понял value.converter.schema.enable=false не так давно, но реальная проблема была с целочисленной кодировкой. Я опубликую свой ответ немного позже.