#masstransit #automatonymous
Вопрос:
Я крутил свои колеса, пытаясь заставить работать компьютерную машину MassTransitStateMachine, и, похоже, я не совсем понимаю, как это должно работать.
Ошибка, которую я получаю (полный код ниже) PayloadNotFoundException
, возникает, когда я пытаюсь позвонить stateMachine.RaiseEvent(instance, stateMachine.DeployApplicationRequest, request)
в сочетании с .Send()
Действием в государственной машине. Я пытался проследить это до конца, но ничего не добился.
Вот соответствующий код и объяснение намерений:
Я получаю запрос на развертывание приложения через сообщение WebAPI и преобразую запрос в a DeployApplicationRequest
.
public record DeployApplicationRequest
{
// State machine properties
public Guid CorrelationId { get; init; }
public ChatApplication ChatApplication { get; init; }
public string ChannelId { get; init; }
public string UserId { get; init; }
// Command
public DeployApplication DeployApplication { get; init; }
}
public enum ChatApplication
{
Slack,
Teams
}
Этот запрос инкапсулирует две вещи: состояние, которое необходимо поддерживать API, чтобы он знал, как направлять результаты обратно запрашивающему (свойства конечного автомата), и Command
состояние, которое должно быть отправлено на служебную шину.
Команда, DeployApplication
, выглядит так:
public record DeployApplication
{
public Guid CorrelationId { get; init;}
public string InitiatedBy { get; init; }
public DateTime CreatedDate { get; init; }
public string Environment { get; init; }
public string PullRequestId { get; init; }
}
Я создал экземпляр состояния, чтобы сохранить детали запроса (например, поступил ли он через Slack или команды). Я не хочу публиковать эту информацию в автобусе:
public class DeployApplicationState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public ChatApplication ChatApplication { get; set; }
public string ChannelId { get; set; }
public string UserId { get; set; }
}
And I have created a StateMachine
to handle this:
public class DeployApplicationStateMachine : MassTransitStateMachine<DeployApplicationState>
{
private readonly ILogger<DeployApplicationStateMachine> logger;
public DeployApplicationStateMachine(ILogger<DeployApplicationStateMachine> logger)
{
this.logger = logger;
InstanceState(x => x.CurrentState);
Event(() => DeployApplicationRequest, x => x.CorrelateById(context => context.Message.CorrelationId));
Initially(
When(DeployApplicationRequest)
.Then(x => {
x.Instance.CorrelationId = x.Data.CorrelationId;
x.Instance.ChannelId = x.Data.ChannelId;
x.Instance.ChatApplication = x.Data.ChatApplication;
x.Instance.UserId = x.Data.UserId;
})
.Send(context => context.Init<DeployApplication>(context.Data.DeployApplication))
.TransitionTo(Submitted));
}
public Event<DeployApplicationRequest> DeployApplicationRequest { get; private set; }
public State Submitted { get; private set; }
}
To trigger the initial event (since the request is not coming in via a Consumer but rather through a controller), I have injected the state machine into the MassTransit client, and I’m calling the RaiseEvent
method:
public class MassTransitDeployClient : IDeployClient
{
private readonly DeployApplicationStateMachine stateMachine;
public MassTransitDeployClient(DeployApplicationStateMachine stateMachine)
{
this.stateMachine = stateMachine;
}
public async Task Send(DeployApplicationRequest request)
{
var instance = new DeployApplicationState
{
CorrelationId = request.CorrelationId,
ChannelId = request.ChannelId,
ChatApplication = request.ChatApplication,
UserId = request.UserId
};
await stateMachine.RaiseEvent(instance, stateMachine.DeployApplicationRequest, request);
// This works for sending to the bus, but I lose the state information
//await sendEndpointProvider.Send(request.DeployApplication);
}
}
И конфигурация контейнера выглядит следующим образом:
services.AddMassTransit(x =>
{
x.AddSagaStateMachine<DeployApplicationStateMachine, DeployApplicationState>()
.InMemoryRepository();
x.UsingInMemory((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
EndpointConvention.Map<DeployApplication>(new Uri($"queue:{typeof(DeployApplication).FullName}"));
services.AddMassTransitHostedService();
Поднятие события работает просто отлично, и государственная машина переходит в Submitted
состояние успешно, если я не включу .Send(...)
ее в государственную машину. В ту секунду, когда я представляю, что Activity
я получаю PayloadNotFoundException
. Я перепробовал около 15 различных способов сделать это за выходные, но безуспешно, и я надеюсь, что кто-нибудь сможет помочь мне увидеть ошибки моих способов.
Спасибо за чтение! Кстати, фантастическая библиотека. Я буду искать способы внести свой вклад в это в будущем, это одна из самых полезных библиотек, с которыми я сталкивался (и, Крис, ваши видео на Youtube превосходны).
Ответ №1:
Государственные машины Saga не предназначены для вызова непосредственно с контроллеров. Вы должны отправить (или опубликовать) сообщение, которое затем будет отправлено в saga через посредника сообщений.
ЕСЛИ вы хотите сделать это таким образом, вы можете использовать вместо этого посредника MassTransit, но вы все равно будете отправлять сообщение через посредника в сагу, которое затем будет обработано MassTransit.
TL;DR — вы не используете
RaiseEvent
с автоматами состояний saga.
Комментарии:
1. Эй, Крис, спасибо за быстрый ответ. Это была та деталь, которую я упустил. Поэтому вариант 1-это отправить сообщение на автобус, чтобы затем его забрало то же приложение. К сожалению, при таком подходе я не могу синхронно ответить на http-запрос, чтобы уведомить запрашивающего о том, что было отправлено сообщение о развертывании. Там мне придется пересмотреть свой дизайн. Для второго варианта можно ли настроить посредника, чтобы он принимал запрос на развертывание, а затем отправлял приложение развертывания на шину? Похоже, он пытается отправить его на тот же адрес обратной связи.
2. Вам нужно будет добавить зависимость от
IBus
публикации в шину из посредника. Вы можете посмотреть на контроллер API ForkJoint, он использует запрос/ответ на шину для ответа внутри контроллера.