#google-oauth #google-cloud-pubsub
#google-oauth #google-cloud-pubsub
Вопрос:
У нас есть конвейер данных, встроенный в Google Cloud Dataflow, который использует сообщения из темы pubsub и передает их в BigQuery. Чтобы проверить, что это работает успешно, у нас есть несколько тестов, которые выполняются в конвейере CI, эти тесты отправляют сообщения в тему pubsub и проверяют, что сообщения успешно записаны в BigQuery.
Это код, который публикуется в теме pubsub:
from google.cloud import pubsub_v1
def post_messages(project_id, topic_id, rows)
futures = dict()
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(
project_id, topic_id
)
def get_callback(f, data):
def callback(f):
try:
futures.pop(data)
except:
print("Please handle {} for {}.".format(f.exception(), data))
return callback
for row in rows:
# When you publish a message, the client returns a future. Data must be a bytestring
# ...
# construct a message in var json_data
# ...
message = json.dumps(json_data).encode("utf-8")
future = publisher.publish(
topic_path,
message
)
futures_key = str(message)
futures[futures_key] = future
future.add_done_callback(get_callback(future, futures_key))
# Wait for all the publish futures to resolve before exiting.
while futures:
time.sleep(1)
Когда мы запускаем этот тест в нашем конвейере CI, он начал периодически завершаться ошибкой
21:38:55: AuthMetadataPluginCallback "<google.auth.transport.grpc.AuthMetadataPlugin object at 0x7f5247407220>" raised exception!
Traceback (most recent call last):
File "/opt/conda/envs/py3/lib/python3.8/site-packages/grpc/_plugin_wrapping.py", line 89, in __call__
self._metadata_plugin(
File "/opt/conda/envs/py3/lib/python3.8/site-packages/google/auth/transport/grpc.py", line 101, in __call__
callback(self._get_authorization_headers(context), None)
File "/opt/conda/envs/py3/lib/python3.8/site-packages/google/auth/transport/grpc.py", line 87, in _get_authorization_headers
self._credentials.before_request(
File "/opt/conda/envs/py3/lib/python3.8/site-packages/google/auth/credentials.py", line 134, in before_request
self.apply(headers)
File "/opt/conda/envs/py3/lib/python3.8/site-packages/google/auth/credentials.py", line 110, in apply
_helpers.from_bytes(token or self.token)
File "/opt/conda/envs/py3/lib/python3.8/site-packages/google/auth/_helpers.py", line 130, in from_bytes
raise ValueError("***0!r*** could not be converted to unicode".format(value))
ValueError: None could not be converted to unicode
Error: The operation was canceled.
К сожалению, это не удается только в нашем конвейере CI, и даже тогда он периодически выходит из строя (сбой происходит только при небольшом проценте всех запусков конвейера CI). Если я запускаю один и тот же тест локально, он выполняется успешно каждый раз. При запуске в конвейере CI код аутентифицируется как учетная запись службы, тогда как при локальном запуске он аутентифицируется как я
Из сообщения об ошибке я знаю, что в этом коде происходит сбой:
if isinstance(result, six.text_type):
return result
else:
raise ValueError("{0!r} could not be converted to unicode".format(value))
который находится в библиотеке python от Google, которую мы устанавливаем с помощью pip.
Очевидно, что выражение:
isinstance(result, six.text_type)
выполняется оценка False
. Я поставил точку останова для этого кода, когда запускал его локально, и Обнаружил, что при нормальных обстоятельствах (т. Е. Когда Он работает) Значение result
примерно такое:
Это похоже на какой-то токен аутентификации.
Учитывая сообщение об ошибке:
Ошибка ValueError: None не удалось преобразовать в unicode
кажется, что какие бы действия ни предпринимались библиотеками аутентификации Google, они передаются None
в код, показанный выше.
Здесь я нахожусь на пределе своих знаний. Учитывая, что это происходит только в конвейере CI, у меня нет возможности установить точку останова в моем коде и отладить его. Учитывая стек вызовов в сообщении об ошибке, это как-то связано с аутентификацией.
Я надеюсь, что кто-нибудь может посоветовать, как действовать.
Может ли кто-нибудь объяснить способ, с помощью которого я могу узнать, почему None
передается коду, который вызывает ошибку?
Комментарии:
1. Есть ли у вас специальная кодировка по умолчанию в вашем CI?
2. Насколько мне известно, нет.
3. Если тексты, которые вы кодируете в json, находятся в юникоде . вы можете попробовать
ensure_ascii=False
. В вашем коде это будет выглядеть так:message = json.dumps(json_data, ensure_ascii=False).encode("utf-8")
.4. У меня такая же проблема. предложение @syrkuit по фильтрации журналов (см.: github.com/googleapis/google-auth-library-python/issues /… ) помогает с беспорядочным выводом. Надеюсь, скоро будет исправлено.
5. Мы не видели, чтобы это происходило уже много недель. Я подозреваю (но не могу подтвердить), что это было решено путем обновления зависимостей (т. Е. Библиотек Google Cloud python). Скрестив пальцы, это исправлено навсегда.
Ответ №1:
У нас была та же ошибка. Наконец-то решил эту проблему, используя веб-токен JSON для аутентификации в Quckstart от Google. Вот так:
import json
from google.cloud import pubsub_v1
from google.auth import jwt
def post_messages(credentials_path, topic, list_of_messages):
credentials_dict = json.load(open(credentials_path,'r'))
audience = "https://pubsub.googleapis.com/google.pubsub.v1.Publisher"
credentials_ob = jwt.Credentials.from_service_account_info(
credentials_dict, audience=audience
)
publisher = pubsub_v1.PublisherClient(credentials=credentials_ob)
for message_dict in list_of_message_dicts:
message = json.dumps(message_dict, default=str).encode("utf-8")
future = publisher.publish(topic, message)
Мы также обновили нашу среду, но это не исправило ValueError
, пока мы не изменили jwt
. В любом случае, вот среда:
google-api-core==2.4.0
google-api-python-client==2.36.0
google-auth==2.3.2
google-auth-httplib2==0.1.0
google-auth-oauthlib==0.4.6
google-cloud-core==2.1.0
google-cloud-pubsub==2.9.0