#c# #rabbitmq #masstransit
#c# #rabbitmq #массовый переход
Вопрос:
RabbitMQ, похоже, обладает двумя свойствами, которые очень похожи, и я не совсем понимаю разницу. ConversationId
и CorrelationId
.
Мой вариант использования следующий. У меня есть веб-сайт, который генерирует Guid
. Веб-сайт вызывает API, добавляя этот уникальный идентификатор к HttpRequest
заголовкам. Это, в свою очередь, публикует сообщение в RabbitMQ. Это сообщение обрабатывается первым потребителем и передается в другом месте другому потребителю, и так далее.
В целях ведения журнала я хочу зарегистрировать идентификатор, который связывает первоначальный запрос со всеми последующими действиями. Это должно быть уникальным для этого путешествия по различным частям приложения. Следовательно. При регистрации в чем-то вроде Serilog / ElasticSearch становится легко увидеть, какой запрос вызвал первоначальный запрос, и все записи журнала для этого запроса во всем приложении могут быть сопоставлены друг с другом.
Я создал провайдера, который просматривает входящий HttpRequest
идентификатор. Я назвал это «CorrelationId», но я начинаю задаваться вопросом, действительно ли это следует называть «conversationId». С точки зрения RabbitMQ, идея «conversationId» лучше подходит для этой модели, или «CorrelationId» лучше?
В чем разница между этими двумя концепциями?
С точки зрения кода, я попытался сделать следующее. Сначала зарегистрируйте шину в моем API и настройте SendPublish
на использование CorrelationId
от провайдера.
// bus registration in the API
var busSettings = context.Resolve<BusSettings>();
// using AspNetCoreCorrelationIdProvider
var correlationIdProvider = context.Resolve<ICorrelationIdProvider>();
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(
new Uri(busSettings.HostAddress),
h =>
{
h.Username(busSettings.Username);
h.Password(busSettings.Password);
});
cfg.ConfigurePublish(x => x.UseSendExecute(sendContext =>
{
// which one is more appropriate
//sendContext.ConversationId = correlationIdProvider.GetCorrelationId();
sendContext.CorrelationId = correlationIdProvider.GetCorrelationId();
}));
});
Для справки, это мой простой интерфейс поставщика
// define the interface
public interface ICorrelationIdProvider
{
Guid GetCorrelationId();
}
И реализация AspNetCore, которая извлекает уникальный идентификатор, установленный вызывающим клиентом (то есть веб-сайтом).
public class AspNetCoreCorrelationIdProvider : ICorrelationIdProvider
{
private IHttpContextAccessor _httpContextAccessor;
public AspNetCoreCorrelationIdProvider(IHttpContextAccessor httpContextAccessor)
{
_httpContextAccessor = httpContextAccessor;
}
public Guid GetCorrelationId()
{
if (_httpContextAccessor.HttpContext.Request.Headers.TryGetValue("correlation-Id", out StringValues headers))
{
var header = headers.FirstOrDefault();
if (Guid.TryParse(header, out Guid headerCorrelationId))
{
return headerCorrelationId;
}
}
return Guid.NewGuid();
}
}
Finally, my Service hosts are simple windows service applications that sit and consume published messages. They use the following to grab the CorrelationId and might well publish to other consumers as well in other service hosts.
public class MessageContextCorrelationIdProvider : ICorrelationIdProvider
{
/// <summary>
/// The consume context
/// </summary>
private readonly ConsumeContext _consumeContext;
/// <summary>
/// Initializes a new instance of the <see cref="MessageContextCorrelationIdProvider"/> class.
/// </summary>
/// <param name="consumeContext">The consume context.</param>
public MessageContextCorrelationIdProvider(ConsumeContext consumeContext)
{
_consumeContext = consumeContext;
}
/// <summary>
/// Gets the correlation identifier.
/// </summary>
/// <returns></returns>
public Guid GetCorrelationId()
{
// correlationid or conversationIs?
if (_consumeContext.CorrelationId.HasValue amp;amp; _consumeContext.CorrelationId != Guid.Empty)
{
return _consumeContext.CorrelationId.Value;
}
return Guid.NewGuid();
}
}
Затем у меня есть регистратор в моем потребителе, который использует этого поставщика для извлечения CorrelationId
:
public async Task Consume(ConsumeContext<IMyEvent> context)
{
var correlationId = _correlationProvider.GetCorrelationId();
_logger.Info(correlationId, $"#### IMyEvent received for customer:{context.Message.CustomerId}");
try
{
await _mediator.Send(new SomeOtherRequest(correlationId) { SomeObject: context.Message.SomeObject });
}
catch (Exception e)
{
_logger.Exception(e, correlationId, $"Exception:{e}");
throw;
}
_logger.Info(correlationId, $"Finished processing: {DateTime.Now}");
}
Читая документы, о «conversationId» говорится следующее:
Диалог создается первым отправленным или опубликованным сообщением, в котором не доступен существующий контекст (например, когда сообщение отправляется или публикуется с помощью IBus.Отправка или IBus.Опубликовать). Если для отправки или публикации сообщения используется существующий контекст, conversationId копируется в новое сообщение, гарантируя, что набор сообщений в рамках одного и того же диалога имеет одинаковый идентификатор.
Теперь я начинаю думать, что я перепутал терминологию, и технически это разговор (хотя «разговор» похож на «телефонную игру»).
Итак, CorrelationId
в этом варианте использования или ConversationId
? Пожалуйста, помогите мне правильно использовать мою терминологию!!
Комментарии:
1. Кажется, conversationId — это массовый переход / NServiceBus? Я не нахожу никаких упоминаний в документах RabbitMQ или AMQP. CorrelationId — это специфическая функция AMQP. В спецификации указано «нет формального поведения, но может содержать имя частной очереди ответов, когда используется в сообщениях запроса», и оно обычно используется для имени очереди ответов при выполнении запроса RPC. Итак, я думаю, что ваш вариант использования — conversationId, а не CorrelationId.
2.
ConversationId
ограничивается одной последовательностью сообщений, когда вы получаете одно сообщение и оно проходит через нескольких потребителей.CorrelationId
является более долговечным, вы можете вести несколько разговоров с одним идентификатором корреляции.3. В дополнение к этому, идентификатор беседы генерируется автоматически, а идентификатор корреляции является произвольным, который вы можете взять из своего домена. Например, в нашей саге о корзинах покупок мы используем идентификатор заказа в качестве идентификатора корреляции.
4. Я исправляю conversationId. Конечно, вы можете добавить свой собственный заголовок, если хотите сохранить CorrelationId для его обычной цели выполнения RPC.
5. Когда я столкнулся с этим, я просто решил использовать термин из моей бизнес-области. В итоге он был вызван просто LineID, и я не использовал заголовок. Я просто использовал свой собственный CorrelationId, который помог, потому что разные клиенты rabbitmq могут решать, что делать с этими заголовками по-разному. Я также мог бы включить несколько идентификаторов строк для каждого сообщения, если бы хотел быть менее разговорчивым.
Ответ №1:
В диалоге сообщений (сигнал, предвещающий музыкальную партитуру) может быть одно сообщение (я сказал вам что-то сделать, или я сказал всем, кто слушает, что что-то произошло) или несколько сообщений (я сказал вам что-то сделать, и вы сказали кому-то другому, или я сказал всем, кто слушает, что что-то произошло, и эти слушатели рассказали своим друзьям, и так далее, и тому подобное).
При правильном использовании MassTransit, от первого сообщения до конечного, каждое из этих сообщений будет иметь одинаковое значение ConversationId
. MassTransit копирует свойство из ConsumeContext
в неизмененном виде в каждое исходящее сообщение во время использования сообщения. Это делает все частью одной и той же трассировки — диалога.
Однако CorrelationId не устанавливается MassTransit по умолчанию. Оно может быть установлено автоматически, если свойство сообщения называется CorrelationId (или commandId, или EventID), или вы также можете добавить свои собственные имена.
Если CorrelationId присутствует в полученном сообщении, у любых исходящих сообщений это свойство CorrelationId будет скопировано в свойство InitiatorId (причина и следствие — полученное сообщение инициировало создание последующих сообщений). Это формирует цепочку (или промежуток, в терминологии трассировки), по которой можно следовать, чтобы показать распространение сообщений от исходного сообщения.
CorrelationId следует рассматривать как идентификатор команды или события, чтобы результаты этой команды можно было видеть во всех системных журналах.
Мне кажется, что ваш ввод из HTTP может быть инициатором, и, таким образом, скопируйте этот идентификатор в InitiatorId и создайте новый CorrelationId для сообщения, или вы можете просто использовать тот же идентификатор для начального CorrelationId и позволить последующим сообщениям использовать его в качестве инициатора.