Google PubSub не запускает функцию с каждым новым сообщением

#google-cloud-platform #google-cloud-pubsub

#google-облачная платформа #google-cloud-pubsub

Вопрос:

У меня есть поток Google PubSub, в который каждый день добавляется набор адресов. Я хочу, чтобы каждый из этих адресов обрабатывался запущенной облачной функцией Google. Однако я видел, что каждый адрес обрабатывается только один раз, даже если в поток каждый день добавляется новое сообщение.

Мой вопрос в том, будет ли оно обрабатываться как новое сообщение, если в поток каждый день добавляется одно и то же значение? Или это будет рассматриваться как дублирующее сообщение?

Это сценарий, который я вижу. Каждый день запускается locations облачная функция, которая публикует каждое местоположение в locations теме. В большинстве случаев это те же сообщения, что и в предыдущий день. Они меняются только в том случае, если местоположение закрывается или добавляется новое. Однако я вижу, что многие locations сообщения никогда не перехватываются location_metrics облачной функцией.

Поток функций выглядит следующим образом:

Тема запускается каждый день в 2a (вызывается locations_trigger ) > запускает locations облачную функцию > отправляет в locations тему > запускает location_metrics облачную функцию > отправляет в location_metrics тему

Для locations облачной функции она запускается и возвращает все адреса правильно, а затем отправляет их в locations тему. Я не буду приводить здесь всю функцию целиком, потому что с ней нет проблем. Для каждого местоположения, которое он извлекает, в журнале есть сообщение «опубликовать успешно». Вот часть, которая отправляет сведения о местоположении в тему.

         project_id = "project_id"
        topic_name = "locations"
        topic_id = "projects/project_id/topics/locations"

        publisher = pubsub_v1.PublisherClient()
        topic_path = publisher.topic_path(project_id, topic_name)

        try:
            publisher.publish(topic_path, data=location_details.encode('utf-8'))
            print("publish successful: ", location)
        except Exception as exc:
            print(exc)
  

Примером location отправляемой полезной нагрузки является:
{"id": "accounts/123456/locations/123456", "name": "Business 123 Main St Somewhere NM 10010"}

location_metrics Функция выглядит следующим образом:

 def get_metrics(loc):
    request_body = {
      "locationNames": [ loc['id'] ],
      "basicRequest" : {
              "metricRequests": [
                 {
                   "metric": 'ALL',
                   "options": ['AGGREGATED_DAILY']
                 }
              ],
              "timeRange": {
                   "startTime": start_time_range,
                   "endTime": end_time_range,
              },
        }
    }

    request_url = <request url> 
    report_insights_response = http.request(request_url, "POST", body=json.dumps(request_body))

    report_insights_response = report_insights_response[1]
    report_insights_response = report_insights_response.decode().replace('\n','')
    report_insights_json = json.loads(report_insights_response)

            <bunch of logic to parse the right metrics, am not including because this runs in a separate manual script without issue>

            my_data = json.dumps(my_data)

            project_id = "project_id"
            topic_name = "location-metrics"
            topic_id = "projects/project_id/topics/location-metrics"

            publisher = pubsub_v1.PublisherClient()
            topic_path = publisher.topic_path(project_id, topic_name)

            print("publisher: ", publisher)
            print("topic_path: ", topic_path)
            try:
                publisher.publish(topic_path, data=gmb_data.encode('utf-8'))
                print("publish successful: ", loc['name'])
            except Exception as exc:
                print("topic publish failed: ", exc)

def retrieve_location(event, context):
    auth_flow()
    message_obj = event.data
    message_dcde = message_obj.decode('utf-8')
    message_json = json.loads(message_dcde)

    get_metrics(message_json) 
  

Комментарии:

1. Pubsub обрабатывает сообщения как сообщения. В зависимости от дня не происходит удаления дубликатов или других странных вещей. Ваша проблема может возникнуть по другой причине. Может быть в вашем коде. Или метод, который вы используете для проверки обработки сообщений, неверен.

2. Отвечает ли объяснение на страницах документации о выполнении гарантий на ваш вопрос?

3. @Nick_Kh Это не так. После первого раза новые сообщения не обрабатываются. Это как если бы после отправки адреса один раз тот же адрес больше никогда не обрабатывался.

4. @analyticsPierce, можете ли вы объяснить, как вы запускаете конкретную облачную функцию?

5. @Nick_Kh Я добавил некоторые детали, чтобы помочь прояснить ситуацию.