#python #json #apache-spark #deserialization #spark-streaming-kafka
#python #json #apache-spark #десериализация #spark-streaming-кафка
Вопрос:
У меня есть приложение pyspark, которое использует сообщения из раздела Kafka, эти сообщения сериализуются с помощью org.apache.kafka.connect.json.JsonConverter
. Для этого я использую соединительный разъем Kafka JDBC
Проблема в том, что когда я использую сообщения, столбец ID появляется в каком-то закодированном тексте, таком как «ARM =», когда это должен быть тип number.
Вот код, который у меня есть сейчас
spark = SparkSession.builder.appName("my app").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('WARN')
ssc = StreamingContext(sc, 5)
kafka_params = {
"bootstrap.servers": "kafkahost:9092",
"group.id": "Deserialize"
}
kafka_stream = KafkaUtils.createDirectStream(ssc, ['mytopic'], kafka_params)
kafka_stream.foreachRDD(lambda rdd: rdd.foreach(lambda x: print(x))
ssc.start()
ssc.awaitTermination()
Я знаю, что createDirectStream имеет параметр valueDecoder, который я могу установить, проблема в том, что я не знаю, как использовать это для декодирования. Я также заранее ознакомлен со схемой, поэтому смогу создать ее, если потребуется.
Для справки, это JSON, который я получаю при распечатке rdd.foreach
{
"schema": {
"type": "struct",
"fields": [
{
"type": "bytes",
"optional": False,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters": {
"scale": "0"
},
"field": "ID"
},
{
"type": "string",
"optional": True,
"field": "COLUMN1"
}
],
"optional": False
},
"payload": {
"ID": "AOo=",
"COLUMN1": "some string"
}
}
Комментарии:
1.
"optional": False,
недопустимый JSON, по крайней мере, согласно jsonlint.com ‘T’ вTrue
и ‘F’ вFalse
должны быть в нижнем регистре (см. json-schema.org/understanding-json-schema/reference /… )
Ответ №1:
Итак, как упоминал cricket_007, в вашей конфигурации confluent Kafka вы должны установить параметр следующим образом value.converter.schema.enable=false
. Это избавит вас от поля схемы и оставит вам только полезную нагрузку json. Теперь по какой-то причине у меня возникла проблема, из-за которой все мои числовые столбцы кодировались в этом странном формате AOo=
. Теперь при использовании Json для сериализации ваших данных confluent преобразует ваши числовые столбцы с использованием base64, но реальная проблема возникла еще до этого. По какой-то причине все мои числовые столбцы преобразовывались в байты. Не уверен точно, почему он это делает, но это как-то связано с тем, как confluent обрабатывает базы данных Oracle. В любом случае, способ исправить это — установить декодер значений в вашем createDirectStream
, такой как
kafka_stream = KafkaUtils.createDirectStream(ssc, ['mytopic'], kafka_params, valueDecoder=decoder)
и в вашем методе декодирования вам нужно декодировать ваше сообщение из UTF-8, проанализировать json, а затем декодировать ваш столбец number из base64, а затем из байтов примерно так
def decoder(s):
if s is None:
return None
loaded_json = json.loads(s.decode('utf-8'))
loaded_json["ID"] = int.from_bytes(base64.b64decode(loaded_json['ID']), "big")
return loaded_json
Комментарии:
1. Примечание:
json = json.loads
переопределяет импортированное определение модуля. Если бы вы попытались использовать модуль снова, вы бы получили сообщение об ошибке2. Ах, спасибо! В моем коде этого нет, но я изменю это в ответе, чтобы никто не запутался
Ответ №2:
В вашей конфигурации подключения вы можете установить value.converter.schema.enable=false
, и тогда вы получите только данные «полезной нагрузки» этой записи JSON.
Я предполагаю, что оттуда вы сможете обработать сообщение в соответствии с любым другим примером чтения потокового JSON в PySpark.
В противном случае, поскольку вы не используете структурированную потоковую передачу, вам не нужно определять схему. Скорее вам пришлось бы, по крайней мере, сделать что-то вроде so, чтобы просто проанализировать записи
rdd.map(lambda x: json.loads(x))
.map(lambda x: x['payload'])
.foreach(lambda x: print(x))
Комментарии:
1. Спасибо вам за это, я действительно понял
value.converter.schema.enable=false
не так давно, но реальная проблема была с целочисленной кодировкой. Я опубликую свой ответ немного позже.