#google-cloud-platform #google-cloud-pubsub
#google-облачная платформа #google-cloud-pubsub
Вопрос:
У меня есть поток Google PubSub, в который каждый день добавляется набор адресов. Я хочу, чтобы каждый из этих адресов обрабатывался запущенной облачной функцией Google. Однако я видел, что каждый адрес обрабатывается только один раз, даже если в поток каждый день добавляется новое сообщение.
Мой вопрос в том, будет ли оно обрабатываться как новое сообщение, если в поток каждый день добавляется одно и то же значение? Или это будет рассматриваться как дублирующее сообщение?
Это сценарий, который я вижу. Каждый день запускается locations
облачная функция, которая публикует каждое местоположение в locations
теме. В большинстве случаев это те же сообщения, что и в предыдущий день. Они меняются только в том случае, если местоположение закрывается или добавляется новое. Однако я вижу, что многие locations
сообщения никогда не перехватываются location_metrics
облачной функцией.
Поток функций выглядит следующим образом:
Тема запускается каждый день в 2a (вызывается locations_trigger
) > запускает locations
облачную функцию > отправляет в locations
тему > запускает location_metrics
облачную функцию > отправляет в location_metrics
тему
Для locations
облачной функции она запускается и возвращает все адреса правильно, а затем отправляет их в locations
тему. Я не буду приводить здесь всю функцию целиком, потому что с ней нет проблем. Для каждого местоположения, которое он извлекает, в журнале есть сообщение «опубликовать успешно». Вот часть, которая отправляет сведения о местоположении в тему.
project_id = "project_id"
topic_name = "locations"
topic_id = "projects/project_id/topics/locations"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
try:
publisher.publish(topic_path, data=location_details.encode('utf-8'))
print("publish successful: ", location)
except Exception as exc:
print(exc)
Примером location
отправляемой полезной нагрузки является:
{"id": "accounts/123456/locations/123456", "name": "Business 123 Main St Somewhere NM 10010"}
location_metrics
Функция выглядит следующим образом:
def get_metrics(loc):
request_body = {
"locationNames": [ loc['id'] ],
"basicRequest" : {
"metricRequests": [
{
"metric": 'ALL',
"options": ['AGGREGATED_DAILY']
}
],
"timeRange": {
"startTime": start_time_range,
"endTime": end_time_range,
},
}
}
request_url = <request url>
report_insights_response = http.request(request_url, "POST", body=json.dumps(request_body))
report_insights_response = report_insights_response[1]
report_insights_response = report_insights_response.decode().replace('\n','')
report_insights_json = json.loads(report_insights_response)
<bunch of logic to parse the right metrics, am not including because this runs in a separate manual script without issue>
my_data = json.dumps(my_data)
project_id = "project_id"
topic_name = "location-metrics"
topic_id = "projects/project_id/topics/location-metrics"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
print("publisher: ", publisher)
print("topic_path: ", topic_path)
try:
publisher.publish(topic_path, data=gmb_data.encode('utf-8'))
print("publish successful: ", loc['name'])
except Exception as exc:
print("topic publish failed: ", exc)
def retrieve_location(event, context):
auth_flow()
message_obj = event.data
message_dcde = message_obj.decode('utf-8')
message_json = json.loads(message_dcde)
get_metrics(message_json)
Комментарии:
1. Pubsub обрабатывает сообщения как сообщения. В зависимости от дня не происходит удаления дубликатов или других странных вещей. Ваша проблема может возникнуть по другой причине. Может быть в вашем коде. Или метод, который вы используете для проверки обработки сообщений, неверен.
2. Отвечает ли объяснение на страницах документации о выполнении гарантий на ваш вопрос?
3. @Nick_Kh Это не так. После первого раза новые сообщения не обрабатываются. Это как если бы после отправки адреса один раз тот же адрес больше никогда не обрабатывался.
4. @analyticsPierce, можете ли вы объяснить, как вы запускаете конкретную облачную функцию?
5. @Nick_Kh Я добавил некоторые детали, чтобы помочь прояснить ситуацию.