Automatonymous — Полезная нагрузка не найдена при вызове RaiseEvent с активностью отправки

#masstransit #automatonymous

Вопрос:

Я крутил свои колеса, пытаясь заставить работать компьютерную машину MassTransitStateMachine, и, похоже, я не совсем понимаю, как это должно работать.

Ошибка, которую я получаю (полный код ниже) PayloadNotFoundException , возникает, когда я пытаюсь позвонить stateMachine.RaiseEvent(instance, stateMachine.DeployApplicationRequest, request) в сочетании с .Send() Действием в государственной машине. Я пытался проследить это до конца, но ничего не добился.

Вот соответствующий код и объяснение намерений:

Я получаю запрос на развертывание приложения через сообщение WebAPI и преобразую запрос в a DeployApplicationRequest .

 public record DeployApplicationRequest
    {
        // State machine properties
        public Guid CorrelationId { get; init; }
        public ChatApplication ChatApplication { get; init; }
        public string ChannelId { get; init; }
        public string UserId { get; init; }


        // Command
        public DeployApplication DeployApplication { get; init; }
    }

    public enum ChatApplication
    {
        Slack,
        Teams
    }
 

Этот запрос инкапсулирует две вещи: состояние, которое необходимо поддерживать API, чтобы он знал, как направлять результаты обратно запрашивающему (свойства конечного автомата), и Command состояние, которое должно быть отправлено на служебную шину.

Команда, DeployApplication , выглядит так:

 public record DeployApplication
    {
        public Guid CorrelationId { get; init;}
        public string InitiatedBy { get; init; }
        public DateTime CreatedDate { get; init; }
        public string Environment { get; init; }
        public string PullRequestId { get; init; }
    }
 

Я создал экземпляр состояния, чтобы сохранить детали запроса (например, поступил ли он через Slack или команды). Я не хочу публиковать эту информацию в автобусе:

 public class DeployApplicationState : SagaStateMachineInstance
    {
        public Guid CorrelationId { get; set; }
        public string CurrentState { get; set; }

        public ChatApplication ChatApplication { get; set; }
        public string ChannelId { get; set; }
        public string UserId { get; set; }
    }
 

And I have created a StateMachine to handle this:

 public class DeployApplicationStateMachine : MassTransitStateMachine<DeployApplicationState>
    {
        private readonly ILogger<DeployApplicationStateMachine> logger;
        public DeployApplicationStateMachine(ILogger<DeployApplicationStateMachine> logger)
        {
            this.logger = logger;

            InstanceState(x => x.CurrentState);

            Event(() => DeployApplicationRequest, x => x.CorrelateById(context => context.Message.CorrelationId));

            Initially(
                When(DeployApplicationRequest)
                    .Then(x => {
                        x.Instance.CorrelationId = x.Data.CorrelationId;
                        x.Instance.ChannelId = x.Data.ChannelId;
                        x.Instance.ChatApplication = x.Data.ChatApplication;
                        x.Instance.UserId = x.Data.UserId;
                    })
                    .Send(context => context.Init<DeployApplication>(context.Data.DeployApplication))
                    .TransitionTo(Submitted));
        }

        public Event<DeployApplicationRequest> DeployApplicationRequest { get; private set; }

        public State Submitted { get; private set; }
    }
 

To trigger the initial event (since the request is not coming in via a Consumer but rather through a controller), I have injected the state machine into the MassTransit client, and I’m calling the RaiseEvent method:

 public class MassTransitDeployClient : IDeployClient
    {
        private readonly DeployApplicationStateMachine stateMachine;
        public MassTransitDeployClient(DeployApplicationStateMachine stateMachine)
        {
            this.stateMachine = stateMachine;
        }

        public async Task Send(DeployApplicationRequest request)
        {
            var instance = new DeployApplicationState
            {
                CorrelationId = request.CorrelationId,
                ChannelId = request.ChannelId,
                ChatApplication = request.ChatApplication,
                UserId = request.UserId
            };

            await stateMachine.RaiseEvent(instance, stateMachine.DeployApplicationRequest, request);
            // This works for sending to the bus, but I lose the state information
            //await sendEndpointProvider.Send(request.DeployApplication);
        }
    }
 

И конфигурация контейнера выглядит следующим образом:

 services.AddMassTransit(x =>
            {
                x.AddSagaStateMachine<DeployApplicationStateMachine, DeployApplicationState>()
                    .InMemoryRepository();

                x.UsingInMemory((context, cfg) =>
                {
                    cfg.ConfigureEndpoints(context);
                });
                
                
            });

            EndpointConvention.Map<DeployApplication>(new Uri($"queue:{typeof(DeployApplication).FullName}"));

            services.AddMassTransitHostedService();
 

Поднятие события работает просто отлично, и государственная машина переходит в Submitted состояние успешно, если я не включу .Send(...) ее в государственную машину. В ту секунду, когда я представляю, что Activity я получаю PayloadNotFoundException . Я перепробовал около 15 различных способов сделать это за выходные, но безуспешно, и я надеюсь, что кто-нибудь сможет помочь мне увидеть ошибки моих способов.

Спасибо за чтение! Кстати, фантастическая библиотека. Я буду искать способы внести свой вклад в это в будущем, это одна из самых полезных библиотек, с которыми я сталкивался (и, Крис, ваши видео на Youtube превосходны).

Ответ №1:

Государственные машины Saga не предназначены для вызова непосредственно с контроллеров. Вы должны отправить (или опубликовать) сообщение, которое затем будет отправлено в saga через посредника сообщений.

ЕСЛИ вы хотите сделать это таким образом, вы можете использовать вместо этого посредника MassTransit, но вы все равно будете отправлять сообщение через посредника в сагу, которое затем будет обработано MassTransit.

TL;DR — вы не используете RaiseEvent с автоматами состояний saga.

Комментарии:

1. Эй, Крис, спасибо за быстрый ответ. Это была та деталь, которую я упустил. Поэтому вариант 1-это отправить сообщение на автобус, чтобы затем его забрало то же приложение. К сожалению, при таком подходе я не могу синхронно ответить на http-запрос, чтобы уведомить запрашивающего о том, что было отправлено сообщение о развертывании. Там мне придется пересмотреть свой дизайн. Для второго варианта можно ли настроить посредника, чтобы он принимал запрос на развертывание, а затем отправлял приложение развертывания на шину? Похоже, он пытается отправить его на тот же адрес обратной связи.

2. Вам нужно будет добавить зависимость от IBus публикации в шину из посредника. Вы можете посмотреть на контроллер API ForkJoint, он использует запрос/ответ на шину для ответа внутри контроллера.