Ошибка прерывистой аутентификации при публикации в теме pubsub

#google-oauth #google-cloud-pubsub

#google-oauth #google-cloud-pubsub

Вопрос:

У нас есть конвейер данных, встроенный в Google Cloud Dataflow, который использует сообщения из темы pubsub и передает их в BigQuery. Чтобы проверить, что это работает успешно, у нас есть несколько тестов, которые выполняются в конвейере CI, эти тесты отправляют сообщения в тему pubsub и проверяют, что сообщения успешно записаны в BigQuery.

Это код, который публикуется в теме pubsub:

 from google.cloud import pubsub_v1
def post_messages(project_id, topic_id, rows)
    futures = dict()
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(
        project_id, topic_id
    )

    def get_callback(f, data):
        def callback(f):
            try:
                futures.pop(data)
            except:
                print("Please handle {} for {}.".format(f.exception(), data))

        return callback

    for row in rows:
        # When you publish a message, the client returns a future. Data must be a bytestring
        # ...
        # construct a message in var json_data
        # ...
        message = json.dumps(json_data).encode("utf-8")
        future = publisher.publish(
            topic_path,
            message
        )
        futures_key = str(message)
        futures[futures_key] = future
        future.add_done_callback(get_callback(future, futures_key))
    # Wait for all the publish futures to resolve before exiting.
    while futures:
        time.sleep(1)
 

Когда мы запускаем этот тест в нашем конвейере CI, он начал периодически завершаться ошибкой

 21:38:55: AuthMetadataPluginCallback "<google.auth.transport.grpc.AuthMetadataPlugin object at 0x7f5247407220>" raised exception!
Traceback (most recent call last):
  File "/opt/conda/envs/py3/lib/python3.8/site-packages/grpc/_plugin_wrapping.py", line 89, in __call__
    self._metadata_plugin(
  File "/opt/conda/envs/py3/lib/python3.8/site-packages/google/auth/transport/grpc.py", line 101, in __call__
    callback(self._get_authorization_headers(context), None)
  File "/opt/conda/envs/py3/lib/python3.8/site-packages/google/auth/transport/grpc.py", line 87, in _get_authorization_headers
    self._credentials.before_request(
  File "/opt/conda/envs/py3/lib/python3.8/site-packages/google/auth/credentials.py", line 134, in before_request
    self.apply(headers)
  File "/opt/conda/envs/py3/lib/python3.8/site-packages/google/auth/credentials.py", line 110, in apply
    _helpers.from_bytes(token or self.token)
  File "/opt/conda/envs/py3/lib/python3.8/site-packages/google/auth/_helpers.py", line 130, in from_bytes
    raise ValueError("***0!r*** could not be converted to unicode".format(value))
ValueError: None could not be converted to unicode
Error: The operation was canceled.
 

К сожалению, это не удается только в нашем конвейере CI, и даже тогда он периодически выходит из строя (сбой происходит только при небольшом проценте всех запусков конвейера CI). Если я запускаю один и тот же тест локально, он выполняется успешно каждый раз. При запуске в конвейере CI код аутентифицируется как учетная запись службы, тогда как при локальном запуске он аутентифицируется как я

Из сообщения об ошибке я знаю, что в этом коде происходит сбой:

 if isinstance(result, six.text_type):
        return result
    else:
        raise ValueError("{0!r} could not be converted to unicode".format(value))
 

https://github.com/googleapis/google-auth-library-python/blob/3c3fbf40b07e090f2be7fac5b304dbf438b5cd6c/google/auth/_helpers.py#L127-L130

который находится в библиотеке python от Google, которую мы устанавливаем с помощью pip.

Очевидно, что выражение:

 isinstance(result, six.text_type)
 

выполняется оценка False . Я поставил точку останова для этого кода, когда запускал его локально, и Обнаружил, что при нормальных обстоятельствах (т. Е. Когда Он работает) Значение result примерно такое:

скриншот отладчика, отображающего значение переменной result

Это похоже на какой-то токен аутентификации.

Учитывая сообщение об ошибке:

Ошибка ValueError: None не удалось преобразовать в unicode

кажется, что какие бы действия ни предпринимались библиотеками аутентификации Google, они передаются None в код, показанный выше.

Здесь я нахожусь на пределе своих знаний. Учитывая, что это происходит только в конвейере CI, у меня нет возможности установить точку останова в моем коде и отладить его. Учитывая стек вызовов в сообщении об ошибке, это как-то связано с аутентификацией.

Я надеюсь, что кто-нибудь может посоветовать, как действовать.

Может ли кто-нибудь объяснить способ, с помощью которого я могу узнать, почему None передается коду, который вызывает ошибку?

Комментарии:

1. Есть ли у вас специальная кодировка по умолчанию в вашем CI?

2. Насколько мне известно, нет.

3. Если тексты, которые вы кодируете в json, находятся в юникоде . вы можете попробовать ensure_ascii=False . В вашем коде это будет выглядеть так: message = json.dumps(json_data, ensure_ascii=False).encode("utf-8") .

4. У меня такая же проблема. предложение @syrkuit по фильтрации журналов (см.: github.com/googleapis/google-auth-library-python/issues /… ) помогает с беспорядочным выводом. Надеюсь, скоро будет исправлено.

5. Мы не видели, чтобы это происходило уже много недель. Я подозреваю (но не могу подтвердить), что это было решено путем обновления зависимостей (т. Е. Библиотек Google Cloud python). Скрестив пальцы, это исправлено навсегда.

Ответ №1:

У нас была та же ошибка. Наконец-то решил эту проблему, используя веб-токен JSON для аутентификации в Quckstart от Google. Вот так:

 import json
from google.cloud import pubsub_v1
from google.auth import jwt

def post_messages(credentials_path, topic, list_of_messages):

    credentials_dict = json.load(open(credentials_path,'r'))

    audience = "https://pubsub.googleapis.com/google.pubsub.v1.Publisher"
    credentials_ob = jwt.Credentials.from_service_account_info(
        credentials_dict, audience=audience
    )

    publisher = pubsub_v1.PublisherClient(credentials=credentials_ob)

    for message_dict in list_of_message_dicts:
    
        message = json.dumps(message_dict, default=str).encode("utf-8")
    
        future = publisher.publish(topic, message)
 

Мы также обновили нашу среду, но это не исправило ValueError , пока мы не изменили jwt . В любом случае, вот среда:

 google-api-core==2.4.0
google-api-python-client==2.36.0
google-auth==2.3.2
google-auth-httplib2==0.1.0
google-auth-oauthlib==0.4.6
google-cloud-core==2.1.0
google-cloud-pubsub==2.9.0