Как опубликовать схему kafka с помощью python

#python #rest #apache-kafka #confluent-schema-registry

#питон #rest #апачи-кафка #confluent-schema-registry

Вопрос:

Я пытаюсь опубликовать схему kafka с помощью python.

Из CLI я бы использовал такой синтаксис, как:

 curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1 json" --data '{"schema": "{"type":"record","name":"VisualDetections","namespace":"com.namespace.something","fields":[{"name":"vehicle_id","type":"int"},{"name":"source","type":"string"},{"name":"width","type":"int"},{"name":"height","type":"int"},{"name":"annotated_frame","type":["string","null"]},{"name":"version","type":"string"},{"name":"fps","type":"int"},{"name":"mission_id","type":"int"},{"name":"sequence","type":{"type":"array","items":{"type":"record","name":"sequence_record","fields":[{"name":"frame_id","type":"int"},{"name":"timestamp","type":"long"},{"name":"localization","type":{"type":"array","items":{"type":"record","name":"localization_record","fields":[{"name":"latitude","type":"double"},{"name":"longitude","type":"double"},{"name":"class","type":"string"},{"name":"object_id","type":"int"},{"name":"confidence","type":"double"},{"name":"bbox","type":{"type":"record","name":"bbox","fields":[{"name":"x_min","type":"int"},{"name":"y_min","type":"int"},{"name":"x_max","type":"int"},{"name":"y_max","type":"int"}]}}]}}}]}}}]}"}' http://server_ip:8081/subjects/VisualDetections-value/versions/
 

Когда я попытался перенести эту функцию на python, я попробовал что-то вроде:

 import requests
import json

topic = 'VisualDetections'
headers = {'Content-Type':  'application/vnd.schemaregistry.v1 json'}
with open(avro_path) as fp:
     data = {'schema': json.load(fp)}
data_json = json.dumps(data)
cmd = 'http://server_ip:8081/subjects/{}-value/versions/'.format(topic)
response = requests.post(cmd, headers=headers, data=data_json)
 

Приведенное выше возвращает код {"error_code":500,"message":"Internal Server Error"} . Я пробовал другие варианты, такие как:

 with open(avro_path) as fp:
    data = json.load(fp)
 

с кодом ошибки:

 "error_code":422,"message":"Unrecognized field: name"
    
 

В приведенном выше avro_path примере просто содержится схема avro в файле json (также может быть загружена, если это полезно).

Я не уверен, как я мог бы точно опубликовать эти данные. Кроме того, я не принял во внимание -H аргумент post в CLI, поскольку не смог найти эквивалентный аргумент python (хотя и не уверен, что он играет какую-либо роль). Кто-нибудь может предложить решение этой проблемы.

Комментарии:

1. headers=headers то же -H самое, что и … Но вы можете просмотреть журналы сервера, если сможете, чтобы увидеть точную ошибку с данными, которые вы отправляете для ошибки 500

2. -H Опция в curl просто определяет заголовок, который в данном случае является Content-Type: application/vnd.schemaregistry.v1 json . Вы уже делаете это с.

Ответ №1:

Для второй ошибки полезная нагрузка должна быть {'schema': "schema string"}

Во-первых, я думаю, что это вопрос кодировки; json.load будет считывать файл в dict, а не просто в строку.

УВЕДОМЛЕНИЕ

 >>> import json
>>> schema = {"type":"record"}  # example when using json.load() ... other data excluded
>>> json.dumps({'schema': schema})
'{"schema": {"type": "record"}}'  # the schema value is not a string
>>> json.dumps({'schema': json.dumps(schema)})
'{"schema": "{\"type\": \"record\"}"}'  # here it is
 

Попробуйте просто прочитать файл

 url = 'http://server_ip:8081/subjects/{}-value/versions/'.format(topic)
with open(avro_path) as fp:
     data = {'schema': fp.read().strip()}
     response = requests.post(cmd, headers=headers, data=json.dumps(data))
 

В противном случае вы бы json.load использовали json.dumps дважды, как показано выше

Вы также можете попробовать json=data , а не data=json.dumps(data)