Не удается опубликовать сообщение Pubsub в Airflow Python 3

#python-3.x #airflow #google-cloud-pubsub #google-cloud-composer

#python-3.x #воздушный поток #google-облако-pubsub #google-cloud-composer

Вопрос:

Я не могу опубликовать, используя PubSubHook в Airflow с Python 3. С Python 2 все работает отлично, но с Python 3 я получаю эту ошибку {models.py:1760} ERROR - Object of type 'bytes' is not JSON serializable . Похоже, что кодирование сообщения в Python 3 приводит к получению байта, который затем не может быть обработан сериализатором JSON.

Следующее отлично работает в Python 2:

 def send_message_to_pubsub(message):
    pubsub_message = {'data': b64encode(message)}
    hook = PubSubHook(gcp_conn_id='google_cloud_default')
    hook.publish('project-name', 'topic-name', [pubsub_message])
  

Приведенный пример не работает так же хорошо с Python 3.

Обновление 1:

Пробовал со следующим, но получил ошибку:

 def send_message_to_pubsub():
    message = 'Test message'
    pubsub_message = {'data': b64encode(message).decode()}
    hook = PubSubHook(gcp_conn_id='google_cloud_default')
    hook.publish('project-name', 'topic-name', [pubsub_message])

{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test [2019-03-18 17:10:28,903] {models.py:1760} ERROR - a bytes-like object is required, not 'str'
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test Traceback (most recent call last):
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test   File "/usr/local/lib/airflow/airflow/models.py", line 1659, in _run_raw_task
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test     result = task_copy.execute(context=context)
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test   File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 95, in execute
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test     return_value = self.execute_callable()
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test   File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 100, in execute_callable
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test     return self.python_callable(*self.op_args, **self.op_kwargs)
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test   File "/home/airflow/gcs/dags/pubsub-test-dag.py", line 31, in send_message_to_pubsub
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test     pubsub_message = {'data': b64encode(message).decode()}
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test   File "/opt/python3.6/lib/python3.6/base64.py", line 58, in b64encode
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test     encoded = binascii.b2a_base64(s, newline=False)
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test TypeError: a bytes-like object is required, not 'str'
  

Обновление 2:

Пробовал со следующим, что привело к другой ошибке. На этот раз из JSON serializer:

 def send_message_to_pubsub():
    message = 'Test message'
    pubsub_message = {'data': b64encode(message.encode())}
    hook = PubSubHook(gcp_conn_id='google_cloud_default')
    hook.publish('project', 'topic', [pubsub_message]) 

[2019-03-19 10:44:29,845] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test [2019-03-19 10:44:29,841] {models.py:1760} ERROR - Object of type 'bytes' is not JSON serializable
[2019-03-19 10:44:29,846] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test Traceback (most recent call last):
[2019-03-19 10:44:29,846] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test   File "/usr/local/lib/airflow/airflow/models.py", line 1659, in _run_raw_task
[2019-03-19 10:44:29,847] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test     result = task_copy.execute(context=context)
[2019-03-19 10:44:29,847] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test   File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 95, in execute
[2019-03-19 10:44:29,847] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test     return_value = self.execute_callable()
[2019-03-19 10:44:29,847] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test   File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 100, in execute_callable
[2019-03-19 10:44:29,848] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test     return self.python_callable(*self.op_args, **self.op_kwargs)
[2019-03-19 10:44:29,848] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test   File "/home/airflow/gcs/dags/pubsub-test-dag.py", line 33, in send_message_to_pubsub
[2019-03-19 10:44:29,848] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test     hook.publish('project', 'topic', [pubsub_message])
[2019-03-19 10:44:29,848] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test   File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_pubsub_hook.py", line 75, in publish
[2019-03-19 10:44:29,849] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test     topic=full_topic, body=body)
[2019-03-19 10:44:29,849] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test   File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/discovery.py", line 795, in method
[2019-03-19 10:44:29,849] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test     actual_path_params, actual_query_params, body_value)
[2019-03-19 10:44:29,850] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test   File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/model.py", line 151, in request
[2019-03-19 10:44:29,850] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test     body_value = self.serialize(body_value)
[2019-03-19 10:44:29,850] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test   File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/model.py", line 260, in serialize
[2019-03-19 10:44:29,850] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test     return json.dumps(body_value)
[2019-03-19 10:44:29,851] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test   File "/opt/python3.6/lib/python3.6/json/__init__.py", line 231, in dumps
[2019-03-19 10:44:29,851] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test     return _default_encoder.encode(obj)
[2019-03-19 10:44:29,853] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test   File "/opt/python3.6/lib/python3.6/json/encoder.py", line 199, in encode
[2019-03-19 10:44:29,853] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test     chunks = self.iterencode(o, _one_shot=True)
[2019-03-19 10:44:29,853] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test   File "/opt/python3.6/lib/python3.6/json/encoder.py", line 257, in iterencode
[2019-03-19 10:44:29,854] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test     return _iterencode(o, 0)
[2019-03-19 10:44:29,852] {models.py:1791} INFO - Marking task as FAILED.
  

Комментарии:

1. Какая строка кода из вашего фрагмента кода выдала это сообщение об ошибке?

2. Оно выдает ошибку в models.py как указано выше. Похоже, что причиной ошибки является способ хранения строк в Python 2 по сравнению с Python 3. Похоже, что Airflow сериализует запрос (к конечной точке REST PubSub), используя сериализатор JSON где-то в models.py . Однако сериализатор JSON работает только со строковыми данными, а кодирование данных в Python 3, как указано выше, создает байтовую версию сообщения.

3. какую версию Airflow вы используете?

4. Версия Airflow 1.10.1 через GCP Cloud Composer, Python версии 3.6

5. Результатом b64encode(message) являются байты для Python 3. Измените его на b64encode(message).decode() .

Ответ №1:

У этого вопроса есть две стороны.

  1. Согласно документации base64, ваше сообщение должно иметь тип bytes , а не str . Чтобы убедиться в этом, попробуйте assert isinstance(message, bytes) . Это приведет к ошибке.

Решение зависит от того, откуда приходит ваше сообщение.

  • Если ваше сообщение представляет собой строку, вам следует закодировать его в байты перед отправкой в base64:
 b64encode(message.encode())
  
  • Если ваше сообщение должно иметь тип bytes , вам следует изменить способ его чтения в Python.

  1. Согласно документации модуля JSON в Python, byte типы не поддерживаются. Они должны быть str типами. Это означает, что все, что вы отправляете в PubSub API, должно быть в виде строки. Таким образом, вы можете декодировать это в строку следующим образом:
 pubsub_message = {'data': b64encode(message.encode()).decode()}
  

Комментарии:

1. Сообщение представляет собой обычный текст в соответствии с фрагментом кода в обновлении 1. Пытался b64encode(message.encode()) , но на этот раз получил другую ошибку. Пожалуйста, смотрите Обновление 2

2. Вы пробовали оба решения? pubsub_message = {'data': b64encode(message.encode()).decode()}

3. Следовало попробовать в соответствии с вашим вторым предложением. Это работает с pubsub_message = {'data': b64encode(message.encode()).decode()} . Большое спасибо за вашу помощь!!!