Приложение функций Azure — исключение: ошибка утверждения: несоответствие количества тел и метаданных

#python #azure-functions #azure-eventhub #debezium

Вопрос:

Я разрабатываю функциональное приложение в Azure, которое обрабатывает сообщения, отправленные в Azure EventHub. Сообщения генерируются Debezium (CDC на PostgreSQL). Сообщения в формате JSON. В основном приведенный ниже код работает без проблем, но для определенных сообщений (удаление) Я получаю ошибку, которую я не понимаю…

 [2021-10-22T16:23:46.679Z] Worker process started and initialized.
[2021-10-22T16:23:50.220Z] Host lock lease acquired by instance ID '0000000000000000000000002A92F406'.
[2021-10-22T16:25:33.353Z] Executing 'Functions.testEventHubTrigger' (Reason='(null)', Id=ce31cdac-24d4-4ff1-a8d9-3f95bc8b760a)
[2021-10-22T16:25:33.355Z] Trigger Details: PartionId: 0, Offset: 12885023368-12885023368, EnqueueTimeUtc: 2021-10-22T16:25:33.2280000Z-2021-10-22T16:25:33.2280000Z, SequenceNumber: 228-228, Count: 1
[2021-10-22T16:25:33.417Z] New event detected
[2021-10-22T16:25:33.420Z] Event message: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"departmentid"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"groupname"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"default":0,"field":"modifieddate"}],"optional":true,"name":"Adventureworks.humanresources.department.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"departmentid"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"groupname"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"default":0,"field":"modifieddate"}],"optional":true,"name":"Adventureworks.humanresources.department.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"Adventureworks.humanresources.department.Envelope"},"payload":{"before":{"departmentid":186,"name":"","groupname":"","modifieddate":0},"after":null,"source":{"version":"1.6.2.Final","connector":"postgresql","name":"Adventureworks","ts_ms":1634919842294,"snapshot":"false","db":"Adventureworks","sequence":"["1040891432","1040891432"]","schema":"humanresources","table":"department","txId":203668,"lsn":1040927480,"xmin":null},"op":"d","ts_ms":1634919933086,"transaction":null}}
[2021-10-22T16:25:33.439Z] Executed 'Functions.testEventHubTrigger' (Succeeded, Id=ce31cdac-24d4-4ff1-a8d9-3f95bc8b760a, Duration=94ms)
[2021-10-22T16:25:33.613Z] Executing 'Functions.testEventHubTrigger' (Reason='(null)', Id=a693051f-790c-4b90-af21-5792a386c7a1)
[2021-10-22T16:25:33.616Z] Trigger Details: PartionId: 0, Offset: 12885026456-12885032224, EnqueueTimeUtc: 2021-10-22T16:25:33.2280000Z-2021-10-22T16:25:33.3060000Z, SequenceNumber: 229-231, Count: 3
[2021-10-22T16:25:33.636Z] Executed 'Functions.testEventHubTrigger' (Failed, Id=a693051f-790c-4b90-af21-5792a386c7a1, Duration=23ms)
[2021-10-22T16:25:33.639Z] System.Private.CoreLib: Exception while executing function: Functions.testEventHubTrigger. System.Private.CoreLib: Result: Failure
Exception: AssertionError: Number of bodies and metadata mismatched
Stack:   File "C:Program FilesMicrosoftAzure Functions Core Toolsworkerspython3.8/WINDOWS/X64azure_functions_workerdispatcher.py", line 382, in _handle__invocation_request
    args[pb.name] = bindings.from_incoming_proto(
  File "C:Program FilesMicrosoftAzure Functions Core Toolsworkerspython3.8/WINDOWS/X64azure_functions_workerbindingsmeta.py", line 87, in from_incoming_proto
    return binding.decode(datum, trigger_metadata=metadata)
  File "C:Program FilesMicrosoftAzure Functions Core Toolsworkerspython3.8/WINDOWS/X64azurefunctionseventhub.py", line 108, in decode
    return cls.decode_multiple_events(data, trigger_metadata)
  File "C:Program FilesMicrosoftAzure Functions Core Toolsworkerspython3.8/WINDOWS/X64azurefunctionseventhub.py", line 159, in decode_multiple_events
    raise AssertionError('Number of bodies and metadata mismatched')
.
 

Ниже приведен код моей функции:

 from typing import BinaryIO, List
import logging
import json
import psycopg2

import azure.functions as func

def main(events: List[func.EventHubEvent]):
    for event in events:
        logging.debug('New event detected')
        logging.debug('Event message: %s',
                        event.get_body().decode('utf-8'))
        row = json.loads(event.get_body().decode('utf-8'))
 

Ошибка появляется при изменении итератора в цикле for (поскольку он будет пытаться прочитать что-то, чего не существует?)

Пожалуйста, помогите мне отладить это…

РЕДАКТИРОВАТЬ Кажется, что Debezium при выполнении операции удаления в PostgeSQL генерирует два сообщения. Один с полезной нагрузкой, а второй называется сообщением «Tombstone». (https://debezium.io/documentation/reference/1.7/connectors/postgresql.html ) Второе сообщение пустое, что, скорее всего, вызывает указанное выше исключение.

События соединителя PostgreSQL предназначены для работы с уплотнением журнала Kafka. Сжатие журнала позволяет удалять некоторые старые сообщения, если сохраняется хотя бы самое последнее сообщение для каждого ключа. Это позволяет Kafka освободить место для хранения, гарантируя, что раздел содержит полный набор данных и может использоваться для перезагрузки состояния на основе ключа.

События надгробия При удалении строки значение события удаления по-прежнему работает с уплотнением журнала, поскольку Kafka может удалить все предыдущие сообщения с тем же ключом. Однако, чтобы Kafka удалял все сообщения с тем же ключом, значение сообщения должно быть null . Чтобы сделать это возможным, соединитель PostgreSQL следует за событием удаления с помощью специального события tombstone, которое имеет тот же ключ, но значение null.

Кто-нибудь может помочь мне обработать пустое сообщение?

Комментарии:

1. Вы можете проверить, не является ли тело события пустым / пустым, прежде чем вызывать decode для него функцию.

2. Привет, @AnandSowmithiran. Ошибка не в decode. Это итератор. Во время отладки я вижу, что исключение генерируется вторым запуском в строке: for event in events:

3. Вы также пытались задать вопрос на форуме debezium? В документах debezium говорится о сообщениях в тему kafka, не приводятся примеры того, как обрабатывать события tombstone.