Symfony Messenger: можно ли не создавать исключение при последней повторной попытке?

#php #symfony #symfony-messenger

Вопрос:

Мы используем мессенджер Symfony, и у нас есть эти транспорты:

 framework:
    messenger:
        failure_transport: failed

        transports:
            failed:
                dsn: 'doctrine://default?queue_name=failed'
                options:
                    table_name: 'MessengerMessages'
            async:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                retry_strategy:
                    max_retries: 3
                    delay: 5000
                    multiplier: 2
                    max_delay: 0
            asyncLowPriority:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%_low_priority'
                retry_strategy:
                    max_retries: 5
                    delay: 3600000
                    multiplier: 2
                    max_delay: 0
            sync: 'sync://'
 

Когда мы отправляем сообщение в async очередь, и последняя попытка завершается неудачей с исключением, исключение регистрируется в MessengerMessages таблице, и исключение всплывает (в нашем случае отправляется в Sentry). Это то, чего мы хотим.

Однако, когда мы отправляем сообщение в asyncLowPriority очередь, мы хотели бы, чтобы неудачные сообщения:

  • не добрался до failed транспорта
  • не делайте исключение пузыриться

В принципе, исключение должно быть удалено.

Возможно ли это и как?

Причина в том, что мы используем эту очередь для асинхронной загрузки изображений, и мы уже регистрируем каждый сбой в специальной таблице базы данных в обработчике команд.

Ответ №1:

Мне удалось сделать это с помощью промежуточного программного обеспечения:

 
use SymfonyComponentMessengerEnvelope;
use SymfonyComponentMessengerExceptionHandlerFailedException;
use SymfonyComponentMessengerExceptionUnrecoverableMessageHandlingException;
use SymfonyComponentMessengerMiddlewareMiddlewareInterface;
use SymfonyComponentMessengerMiddlewareStackInterface;
use SymfonyComponentMessengerStampReceivedStamp;
use SymfonyComponentMessengerStampRedeliveryStamp;
use SymfonyComponentMessengerStampSentToFailureTransportStamp;
use Throwable;

final class BypassFailureTransportMiddleware implements MiddlewareInterface
{
    public function __construct(
        private string $transportName,
        private int $maxRetries,
    ) {
    }

    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        try {
            return $stack->next()->handle($envelope, $stack);
        } catch (HandlerFailedException $exception) {
            $nestedException = $this->getNestedException($exception);

            if ($nestedException === null) {
                throw $exception;
            }

            /** @var ReceivedStamp|null $receivedStamp */
            $receivedStamp = $envelope->last(ReceivedStamp::class);

            if ($receivedStamp === null || $receivedStamp->getTransportName() !== $this->transportName) {
                throw $exception;
            }

            if (!$this->isLastRetry($envelope, $nestedException)) {
                throw $exception;
            }

            return $envelope->with(new SentToFailureTransportStamp($receivedStamp->getTransportName()));
        }
    }

    private function getNestedException(HandlerFailedException $exception): ?Throwable
    {
        $nestedExceptions = $exception->getNestedExceptions();

        if (count($nestedExceptions) === 1) {
            return $nestedExceptions[0];
        }

        return null;
    }

    private function isLastRetry(Envelope $envelope, Throwable $nestedException): bool
    {
        if ($nestedException instanceof UnrecoverableMessageHandlingException) {
            return true;
        }

        /** @var RedeliveryStamp|null $redeliveryStamp */
        $redeliveryStamp = $envelope->last(RedeliveryStamp::class);

        if ($redeliveryStamp === null) {
            return false;
        }

        return $redeliveryStamp->getRetryCount() === $this->maxRetries;
    }
}
 

Он должен быть настроен с указанием имени транспорта и конфигурации max_retries этого транспорта:

 parameters:
    async_allow_failure_transport_name: 'asyncAllowFailure'
    async_allow_failure_max_retries: 5

services:
  command.bus.bypass_failure_transport_middleware:
    class: AppInfrastructureCommandBusMiddlewareBypassFailureTransportMiddleware
    arguments:
      $transportName: '%async_allow_failure_transport_name%'
      $maxRetries: '%async_allow_failure_max_retries%'

framework:
    messenger:
        transports:
            - name: '%async_allow_failure_transport_name%'
              dsn: '...'
              retry_strategy:
                  max_retries: '%async_allow_failure_max_retries%'
                  delay: 1000
                  multiplier: 2
                  max_delay: 0

        buses:
            command.bus:
                middleware:
                  - 'command.bus.bypass_failure_transport_middleware'