#python-3.x #airflow #google-cloud-pubsub #google-cloud-composer
#python-3.x #воздушный поток #google-облако-pubsub #google-cloud-composer
Вопрос:
Я не могу опубликовать, используя PubSubHook
в Airflow с Python 3. С Python 2 все работает отлично, но с Python 3 я получаю эту ошибку {models.py:1760} ERROR - Object of type 'bytes' is not JSON serializable
. Похоже, что кодирование сообщения в Python 3 приводит к получению байта, который затем не может быть обработан сериализатором JSON.
Следующее отлично работает в Python 2:
def send_message_to_pubsub(message):
pubsub_message = {'data': b64encode(message)}
hook = PubSubHook(gcp_conn_id='google_cloud_default')
hook.publish('project-name', 'topic-name', [pubsub_message])
Приведенный пример не работает так же хорошо с Python 3.
Обновление 1:
Пробовал со следующим, но получил ошибку:
def send_message_to_pubsub():
message = 'Test message'
pubsub_message = {'data': b64encode(message).decode()}
hook = PubSubHook(gcp_conn_id='google_cloud_default')
hook.publish('project-name', 'topic-name', [pubsub_message])
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test [2019-03-18 17:10:28,903] {models.py:1760} ERROR - a bytes-like object is required, not 'str'
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test Traceback (most recent call last):
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test File "/usr/local/lib/airflow/airflow/models.py", line 1659, in _run_raw_task
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test result = task_copy.execute(context=context)
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 95, in execute
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test return_value = self.execute_callable()
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 100, in execute_callable
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test return self.python_callable(*self.op_args, **self.op_kwargs)
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test File "/home/airflow/gcs/dags/pubsub-test-dag.py", line 31, in send_message_to_pubsub
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test pubsub_message = {'data': b64encode(message).decode()}
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test File "/opt/python3.6/lib/python3.6/base64.py", line 58, in b64encode
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test encoded = binascii.b2a_base64(s, newline=False)
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test TypeError: a bytes-like object is required, not 'str'
Обновление 2:
Пробовал со следующим, что привело к другой ошибке. На этот раз из JSON serializer:
def send_message_to_pubsub():
message = 'Test message'
pubsub_message = {'data': b64encode(message.encode())}
hook = PubSubHook(gcp_conn_id='google_cloud_default')
hook.publish('project', 'topic', [pubsub_message])
[2019-03-19 10:44:29,845] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test [2019-03-19 10:44:29,841] {models.py:1760} ERROR - Object of type 'bytes' is not JSON serializable
[2019-03-19 10:44:29,846] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test Traceback (most recent call last):
[2019-03-19 10:44:29,846] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/usr/local/lib/airflow/airflow/models.py", line 1659, in _run_raw_task
[2019-03-19 10:44:29,847] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test result = task_copy.execute(context=context)
[2019-03-19 10:44:29,847] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 95, in execute
[2019-03-19 10:44:29,847] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test return_value = self.execute_callable()
[2019-03-19 10:44:29,847] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 100, in execute_callable
[2019-03-19 10:44:29,848] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test return self.python_callable(*self.op_args, **self.op_kwargs)
[2019-03-19 10:44:29,848] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/home/airflow/gcs/dags/pubsub-test-dag.py", line 33, in send_message_to_pubsub
[2019-03-19 10:44:29,848] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test hook.publish('project', 'topic', [pubsub_message])
[2019-03-19 10:44:29,848] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_pubsub_hook.py", line 75, in publish
[2019-03-19 10:44:29,849] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test topic=full_topic, body=body)
[2019-03-19 10:44:29,849] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/discovery.py", line 795, in method
[2019-03-19 10:44:29,849] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test actual_path_params, actual_query_params, body_value)
[2019-03-19 10:44:29,850] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/model.py", line 151, in request
[2019-03-19 10:44:29,850] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test body_value = self.serialize(body_value)
[2019-03-19 10:44:29,850] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/model.py", line 260, in serialize
[2019-03-19 10:44:29,850] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test return json.dumps(body_value)
[2019-03-19 10:44:29,851] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/opt/python3.6/lib/python3.6/json/__init__.py", line 231, in dumps
[2019-03-19 10:44:29,851] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test return _default_encoder.encode(obj)
[2019-03-19 10:44:29,853] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/opt/python3.6/lib/python3.6/json/encoder.py", line 199, in encode
[2019-03-19 10:44:29,853] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test chunks = self.iterencode(o, _one_shot=True)
[2019-03-19 10:44:29,853] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/opt/python3.6/lib/python3.6/json/encoder.py", line 257, in iterencode
[2019-03-19 10:44:29,854] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test return _iterencode(o, 0)
[2019-03-19 10:44:29,852] {models.py:1791} INFO - Marking task as FAILED.
Комментарии:
1. Какая строка кода из вашего фрагмента кода выдала это сообщение об ошибке?
2. Оно выдает ошибку в models.py как указано выше. Похоже, что причиной ошибки является способ хранения строк в Python 2 по сравнению с Python 3. Похоже, что Airflow сериализует запрос (к конечной точке REST PubSub), используя сериализатор JSON где-то в models.py . Однако сериализатор JSON работает только со строковыми данными, а кодирование данных в Python 3, как указано выше, создает байтовую версию сообщения.
3. какую версию Airflow вы используете?
4. Версия Airflow 1.10.1 через GCP Cloud Composer, Python версии 3.6
5. Результатом
b64encode(message)
являются байты для Python 3. Измените его наb64encode(message).decode()
.
Ответ №1:
У этого вопроса есть две стороны.
- Согласно документации base64, ваше сообщение должно иметь тип
bytes
, а неstr
. Чтобы убедиться в этом, попробуйтеassert isinstance(message, bytes)
. Это приведет к ошибке.
Решение зависит от того, откуда приходит ваше сообщение.
- Если ваше сообщение представляет собой строку, вам следует закодировать его в байты перед отправкой в base64:
b64encode(message.encode())
- Если ваше сообщение должно иметь тип
bytes
, вам следует изменить способ его чтения в Python.
- Согласно документации модуля JSON в Python,
byte
типы не поддерживаются. Они должны бытьstr
типами. Это означает, что все, что вы отправляете в PubSub API, должно быть в виде строки. Таким образом, вы можете декодировать это в строку следующим образом:
pubsub_message = {'data': b64encode(message.encode()).decode()}
Комментарии:
1. Сообщение представляет собой обычный текст в соответствии с фрагментом кода в обновлении 1. Пытался
b64encode(message.encode())
, но на этот раз получил другую ошибку. Пожалуйста, смотрите Обновление 22. Вы пробовали оба решения?
pubsub_message = {'data': b64encode(message.encode()).decode()}
3. Следовало попробовать в соответствии с вашим вторым предложением. Это работает с
pubsub_message = {'data': b64encode(message.encode()).decode()}
. Большое спасибо за вашу помощь!!!