#python #rest #apache-kafka #confluent-schema-registry
#питон #rest #апачи-кафка #confluent-schema-registry
Вопрос:
Я пытаюсь опубликовать схему kafka с помощью python.
Из CLI я бы использовал такой синтаксис, как:
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1 json" --data '{"schema": "{"type":"record","name":"VisualDetections","namespace":"com.namespace.something","fields":[{"name":"vehicle_id","type":"int"},{"name":"source","type":"string"},{"name":"width","type":"int"},{"name":"height","type":"int"},{"name":"annotated_frame","type":["string","null"]},{"name":"version","type":"string"},{"name":"fps","type":"int"},{"name":"mission_id","type":"int"},{"name":"sequence","type":{"type":"array","items":{"type":"record","name":"sequence_record","fields":[{"name":"frame_id","type":"int"},{"name":"timestamp","type":"long"},{"name":"localization","type":{"type":"array","items":{"type":"record","name":"localization_record","fields":[{"name":"latitude","type":"double"},{"name":"longitude","type":"double"},{"name":"class","type":"string"},{"name":"object_id","type":"int"},{"name":"confidence","type":"double"},{"name":"bbox","type":{"type":"record","name":"bbox","fields":[{"name":"x_min","type":"int"},{"name":"y_min","type":"int"},{"name":"x_max","type":"int"},{"name":"y_max","type":"int"}]}}]}}}]}}}]}"}' http://server_ip:8081/subjects/VisualDetections-value/versions/
Когда я попытался перенести эту функцию на python, я попробовал что-то вроде:
import requests
import json
topic = 'VisualDetections'
headers = {'Content-Type': 'application/vnd.schemaregistry.v1 json'}
with open(avro_path) as fp:
data = {'schema': json.load(fp)}
data_json = json.dumps(data)
cmd = 'http://server_ip:8081/subjects/{}-value/versions/'.format(topic)
response = requests.post(cmd, headers=headers, data=data_json)
Приведенное выше возвращает код {"error_code":500,"message":"Internal Server Error"}
. Я пробовал другие варианты, такие как:
with open(avro_path) as fp:
data = json.load(fp)
с кодом ошибки:
"error_code":422,"message":"Unrecognized field: name"
В приведенном выше avro_path
примере просто содержится схема avro в файле json (также может быть загружена, если это полезно).
Я не уверен, как я мог бы точно опубликовать эти данные. Кроме того, я не принял во внимание -H
аргумент post в CLI, поскольку не смог найти эквивалентный аргумент python (хотя и не уверен, что он играет какую-либо роль). Кто-нибудь может предложить решение этой проблемы.
Комментарии:
1.
headers=headers
то же-H
самое, что и … Но вы можете просмотреть журналы сервера, если сможете, чтобы увидеть точную ошибку с данными, которые вы отправляете для ошибки 5002.
-H
Опция вcurl
просто определяет заголовок, который в данном случае являетсяContent-Type: application/vnd.schemaregistry.v1 json
. Вы уже делаете это с.
Ответ №1:
Для второй ошибки полезная нагрузка должна быть {'schema': "schema string"}
Во-первых, я думаю, что это вопрос кодировки; json.load
будет считывать файл в dict, а не просто в строку.
УВЕДОМЛЕНИЕ
>>> import json
>>> schema = {"type":"record"} # example when using json.load() ... other data excluded
>>> json.dumps({'schema': schema})
'{"schema": {"type": "record"}}' # the schema value is not a string
>>> json.dumps({'schema': json.dumps(schema)})
'{"schema": "{\"type\": \"record\"}"}' # here it is
Попробуйте просто прочитать файл
url = 'http://server_ip:8081/subjects/{}-value/versions/'.format(topic)
with open(avro_path) as fp:
data = {'schema': fp.read().strip()}
response = requests.post(cmd, headers=headers, data=json.dumps(data))
В противном случае вы бы json.load
использовали json.dumps
дважды, как показано выше
Вы также можете попробовать json=data
, а не data=json.dumps(data)