Как обработать ошибки или повторить попытку публикации в pubsub сообщений Google с помощью python

#python #google-cloud-pubsub

# #python #google-облако-pubsub

Вопрос:

Я отправляю сообщения в тему в pubsub с помощью этого скрипта:

 publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "my-topic")

while True:
    msg = b"some message"
    future = publisher.publish(topic_path, msg)

    try:
        result = future.result()
    except Exception as ex:
        # handle exception
        print(ex)

    time.sleep(3)
 

Моя проблема в том, как заставить меня print(ex) печатать, когда сообщение не опубликовано в pubsub, или определить, когда сообщение не было отправлено, и попытаться опубликовать его снова.

Комментарии:

1. Я не понимаю вопроса. Я не знаю, какие исключения возникают, но у вас может быть несколько except предложений, чтобы перехватывать много разных исключений и делать разные вещи с каждым. Таким образом, я думаю, вы можете определить, не было ли сообщение опубликовано, или повторить попытку или что-то в этом роде. Проблема в том, что нам как бы не хватает контекста, я действительно не понимаю, чего вы хотите достичь или в чем проблема.

Ответ №1:

Вы можете проверить эту статью, поскольку в ней реализована обработка ошибок при публикации. Был создан метод для получения обратных вызовов. Он принимает будущий объект и строку данных. Если есть исключение, он вызовет exception(), чтобы вернуть созданное исключение.

 def get_callback(f, data):
    def callback(f):
        try:
            print(f.result())
            futures.pop(data)
        except:  # noqa
            print("Please handle {} for {}.".format(f.exception(), data))

    return callback
 

Этот метод использовался для реализации get_callback путем передачи параметров «future» и «data». Где future — это объект, а data — строка.

 for i in range(10):
    data = str(i)
    futures.update({data: None})
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data.encode("utf-8"))
    futures[data] = future
    # Publish failures shall be handled in the callback function.
    future.add_done_callback(get_callback(future, data))
# Wait for all the publish futures to resolve before exiting.
while futures:
    time.sleep(5)