#c# #asp.net-core #entity-framework-core #rabbitmq #asp.net-core-hosted-services
#c# #asp.net-ядро #сущность-фреймворк-ядро #rabbitmq #asp.net-core-hosted-services
Вопрос:
У меня есть WebAPI, который также должен получать сообщения от RabbitMQ. Я использовал этот учебник, потому что знаю, что иногда IIS любит убивать длительные задачи (хотя еще не тестировал его на сервере, возможно, это не сработает). У меня есть служба, которая обрабатывает сообщения, полученные через RabbitMQ. Первая проблема, с которой я столкнулся — я не мог внедрить его в BackgroundService
класс, поэтому я использовал IServiceScopeFactory
. Теперь мне нужно получать сообщения из двух очередей, и, как я понял, лучше всего использовать для этого два канала. Но обработка выполняется в одном сервисе. BackgroundService:
public class ConsumeRabbitMQHostedService : BackgroundService
{
private IConnection _connection;
private IModel _firstChannel;
private IModel _secondChannel;
private RabbitConfigSection _rabbitConfig;
public IServiceScopeFactory _serviceScopeFactory;
public ConsumeRabbitMQHostedService(IOptions<RabbitConfigSection> rabbitConfig, IServiceScopeFactory serviceScopeFactory)
{
_rabbitConfig = rabbitConfig.Value;
_serviceScopeFactory = serviceScopeFactory;
InitRabbitMQ();
}
private void InitRabbitMQ()
{
var factory = new ConnectionFactory { HostName = _rabbitConfig.HostName, UserName = _rabbitConfig.UserName, Password = _rabbitConfig.Password };
_connection = factory.CreateConnection();
_firstChannel = _connection.CreateModel();
_firstChannel.ExchangeDeclare(_rabbitConfig.DefaultExchange, ExchangeType.Topic);
_firstChannel.QueueDeclare(_rabbitConfig.Queues.ConsumeQueues.FirstItemsConsumeQueue, true, false, false, null);
_firstChannel.QueueBind(_rabbitConfig.Queues.ConsumeQueues.FirstItemsConsumeQueue, _rabbitConfig.DefaultExchange, "*.test.queue", null);
_firstChannel.BasicQos(0, 1, false);
_secondChannel = _connection.CreateModel();
_secondChannel.ExchangeDeclare(_rabbitConfig.DefaultExchange, ExchangeType.Topic);
_secondChannel.QueueDeclare(_rabbitConfig.Queues.ConsumeQueues.SecondItemsConsumeQueue, true, false, false, null);
_secondChannel.QueueBind(_rabbitConfig.Queues.ConsumeQueues.SecondItemsConsumeQueue, _rabbitConfig.DefaultExchange, "*.test.queue", null);
_secondChannel.BasicQos(0, 1, false);
_connection.ConnectionShutdown = RabbitMQ_ConnectionShutdown;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingToken.ThrowIfCancellationRequested();
var firstConsumer = new EventingBasicConsumer(_firstChannel);
var secondConsumer = new EventingBasicConsumer(_secondChannel);
using (var scope = _serviceScopeFactory.CreateScope())
{
IIntegrationService scoped = scope.ServiceProvider.GetRequiredService<IIntegrationService>();
firstConsumer.Received = (ch, ea) =>
{
// received message
var content = System.Text.Encoding.UTF8.GetString(ea.Body.ToArray());
// handle the received message
HandleFirstMessage(content, scoped);
_firstChannel.BasicAck(ea.DeliveryTag, false);
};
firstConsumer.Shutdown = OnConsumerShutdown;
firstConsumer.Registered = OnConsumerRegistered;
firstConsumer.Unregistered = OnConsumerUnregistered;
firstConsumer.ConsumerCancelled = OnConsumerConsumerCancelled;
_firstChannel.BasicConsume(_rabbitConfig.Queues.ConsumeQueues.FirstItemsConsumeQueue, false, firstConsumer);
}
using (var scope = _serviceScopeFactory.CreateScope())
{
IIntegrationService scoped = scope.ServiceProvider.GetRequiredService<IIntegrationService>();
secondConsumer.Received = (ch, ea) =>
{
// received message
var content = System.Text.Encoding.UTF8.GetString(ea.Body.ToArray());
// handle the received message
HandleSecondMessage(content, scoped);
_secondChannel.BasicAck(ea.DeliveryTag, false);
};
secondConsumer.Shutdown = OnConsumerShutdown;
secondConsumer.Registered = OnConsumerRegistered;
secondConsumer.Unregistered = OnConsumerUnregistered;
secondConsumer.ConsumerCancelled = OnConsumerConsumerCancelled;
_secondChannel.BasicConsume(_rabbitConfig.Queues.ConsumeQueues.SecondItemsConsumeQueue, false, secondConsumer);
}
return Task.CompletedTask;
}
private void HandleFirstMessage(string content, IIntegrationService integrationService)
{
List<StockImportDto> dataToImport = JsonConvert.DeserializeObject<List<StockImportDto>>(content);
integrationService.ImportFirst(dataToImport);
}
private void HandleSecondMessage(string content, IIntegrationService integrationService)
{
List<Import901Data> importData = JsonConvert.DeserializeObject<List<Import901Data>>(content);
integrationService.ImportSecond(importData);
}
private void OnConsumerConsumerCancelled(object sender, ConsumerEventArgs e) { }
private void OnConsumerUnregistered(object sender, ConsumerEventArgs e) { }
private void OnConsumerRegistered(object sender, ConsumerEventArgs e) { }
private void OnConsumerShutdown(object sender, ShutdownEventArgs e) { }
private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e) { }
public override void Dispose()
{
_firstChannel.Close();
_connection.Close();
base.Dispose();
}
}
В сервисе я получаю
Система.Исключение ObjectDisposedException: «Не удается получить доступ к удаленному экземпляру контекста. Распространенной причиной этой ошибки является удаление экземпляра контекста, который был разрешен при внедрении зависимостей, а затем последующая попытка использовать тот же экземпляр контекста в другом месте вашего приложения. Это может произойти, если вы вызываете ‘Dispose’ для экземпляра контекста или оборачиваете его в оператор using. Если вы используете внедрение зависимостей, вы должны позволить контейнеру внедрения зависимостей позаботиться об удалении экземпляров контекста. Имя объекта: ‘IntegrationDbContext’.’
DbContext
вводится в IIntegrationService
. Если я понимаю, что происходит, два экземпляра службы (или даже один) совместно DbContext
используются, и когда один из них завершает работу, он удаляется DbContext
. Я пытался не создавать два экземпляра (весь код внутри одного using
), пытался сделать IIntegrationService
переходный процесс, пытался делать все асинхронно (это была начальная версия, сделал ее синхронной для тестирования) — все та же ошибка. Что мне здесь делать? И это правильный подход?
Обновление 1. ConfigureServices
в Startup
:
public void ConfigureServices(IServiceCollection services)
{
var rabbitConfigSection =
Configuration.GetSection("Rabbit");
services.Configure<RabbitConfigSection>(rabbitConfigSection);
services.AddDbContext<SUNDbContext>(options =>
options.UseSqlServer(Configuration.GetConnectionString("DefaultConnection")));
services.AddCors();
services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new OpenApiInfo
{
Title = "My API",
Version = "v1"
});
});
services.AddRabbit(Configuration);
services.AddHostedService<ConsumeRabbitMQHostedService>();
services.AddControllers();
services.AddTransient<IIntegrationService, IntegrationService>();// it's transient now, same error with scoped
}
Комментарии:
1. Вероятно, это ни на что не влияет, но только потому, что я заметил: вы закрываете первый канал при Dispose, но не второй, а затем закрываете общее соединение.
2. Можете ли вы также опубликовать свой код запуска, в котором вы создаете / регистрируете DbContext и IIntegrationService?
3. Итак, следуя этой логике, если у вас есть постоянная область, которая разрешается вне контекста отдельного сообщения, что произойдет, если область будет удалена до обработки сообщения? Другими словами, вам необходимо разрешить отдельную область для каждого сообщения, которое обрабатывается внутри обработчика сообщений.
4. Обычно вы были бы правы, но в этом случае вы создаете свою собственную область видимости, которая изменяет время жизни. Я добавил ответ. Рад, что это помогло!
5. Он не может быть удален до завершения использования блока. Однако в вашем исходном коде блок using завершится до того, как ваше сообщение будет обработано.
Ответ №1:
Проблема вызвана тем фактом, что внешний scope
файл, созданный с помощью _serviceScopeFactory.CreateScope()
, удаляется после каждого оператора using, в то время как каждое сообщение все еще пытается полагаться на удаленную область и присоединенный контекст для обработки сообщения.
Решение состоит в том, чтобы создать новую область для каждого сообщения в ваших обработчиках сообщений:
private void HandleFirstMessage(string content)
{
using (var scope = _serviceScopeFactory.CreateScope())
{
IIntegrationService integrationService = scope.ServiceProvider.GetRequiredService<IIntegrationService>();
List<StockImportDto> dataToImport = JsonConvert.DeserializeObject<List<StockImportDto>>(content);
integrationService.ImportFirst(dataToImport);
}
}