Ошибка MSMQ ReceiveById

#c# #msmq

#c# #msmq

Вопрос:

У меня есть следующий код

 public class MsmqQueueProvider : IQueueProvider
{
    public void VeryfyIsAvailable(string name)
    {
        var queueAddress = string.Format(@" .private${0}", name);
        var message = "There was a problem while starting the NEasyMessaging.";

        if (MessageQueue.Exists(queueAddress))
        {
            using (var queue = new MessageQueue(queueAddress))
            {
                if (queue.CanWrite amp;amp; queue.CanRead) return;

                if (queue.CanRead == false)
                {
                    message  = string.Format("Queue {0} is reachable but not readable", queueAddress);

                    throw new QueueProviderProviderException(message);
                }

                message  = string.Format("Queue {0} is reachable but not writable", queueAddress);

                throw new QueueProviderProviderException(message);
            }
        }

        message  = string.Format("Queue {0} cannot be found", queueAddress);

        throw new QueueProviderProviderException(message);
    }

    public QueueMessage Peek(string queueName)
    {
        var queue = new MessageQueue(string.Format(@" .private${0}", queueName), QueueAccessMode.Peek);
        var message = queue.Peek();

        // ReSharper disable once PossibleNullReferenceException
        return new QueueMessage(message.Id, message.Label, new StreamReader(message.BodyStream).ReadToEnd());
    }

    public QueueMessage Receive(string queueName)
    {
        var queue = new MessageQueue(string.Format(@" .private${0}", queueName), QueueAccessMode.Receive);
        var message = queue.Receive(MessageQueueTransactionType.Automatic);

        // ReSharper disable once PossibleNullReferenceException
        return new QueueMessage(message.Id, message.Label, new StreamReader(message.BodyStream).ReadToEnd());
    }

    public QueueMessage ReceiveById(string queueName, string messageId)
    {
        var queue = new MessageQueue(string.Format(@" .private${0}", queueName), QueueAccessMode.Receive);
        var message = queue.ReceiveById(messageId, MessageQueueTransactionType.Automatic);

        // ReSharper disable once PossibleNullReferenceException
        return new QueueMessage(message.Id, message.Label, new StreamReader(message.BodyStream).ReadToEnd());
    }

    public void QueueMessage(string messageContent, string messageName, string queueName)
    {
        var queueAddress = string.Format(@" .private${0}", queueName);

        using (var streamReader = new StringReader(messageContent))
        {
            var message = new Message
            {
                TimeToBeReceived = Message.InfiniteTimeout,
                TimeToReachQueue = Message.InfiniteTimeout,
                Label = messageName,
                UseAuthentication = false,
                Recoverable = true
            };

            using (var queue = new MessageQueue(queueAddress, QueueAccessMode.Send))
            {
                using (var streamWriter = new StreamWriter(message.BodyStream))
                {
                    streamWriter.Write(streamReader.ReadToEnd());
                    streamWriter.Flush();

                    queue.Send(message, MessageQueueTransactionType.Automatic);
                }

                queue.Close();
            }
        }
    }
}


public class UnitOfWork : IUnitOfWork
{
    private TransactionScope _transaction;


    public void Start()
    {
        var transactionOptions = new TransactionOptions
        {
            Timeout = TransactionManager.MaximumTimeout
        };

        _transaction = new TransactionScope(TransactionScopeOption.RequiresNew, transactionOptions);
    }

    public void CompletedWithSuccess()
    {
        if (Transaction.Current.TransactionInformation.Status == TransactionStatus.Active)
        {
            _transaction.Complete();
        }

        _transaction.Dispose();
    }

    public void CompletedWithFail()
    {
        _transaction.Dispose();
    }
}


public sealed partial class Service : ServiceBase
{
    private readonly ILog _log = LogManager.GetLogger(typeof(Service));
    private readonly ManualResetEvent _shutdownEvent = new ManualResetEvent(false);
    private Thread _workerThread;
    private IQueueProvider _queueProvider;
    private IEndpointConfiguration _configuration;
    private IContainer _container;

    public Service()
    {
        InitializeComponent();

        ServiceName = "";
        EventLog.Log = "";
    }


    public void Init()
    {
        var endpointBootstrap = new EndpointBootstrap();

        endpointBootstrap.Initialize();

        _container = endpointBootstrap.IocContainer;
        _queueProvider = _container.Resolve<IQueueProvider>();
        _configuration = _container.Resolve<IEndpointConfiguration>();

        _workerThread = new Thread(DoWork) { Name = "Worker Thread", IsBackground = true };

        _workerThread.Start();
    }

    protected override void OnStart(string[] args)
    {
        Init();
    }

    protected override void OnStop()
    {
        _shutdownEvent.Set();

        if (!_workerThread.Join(3000))
        {
            _workerThread.Abort();
        }
    }

    private void DoWork()
    {
        while (!_shutdownEvent.WaitOne(0))
        {
            var queueMessage = _queueProvider.Peek(_configuration.QueueName);

            try
            {
                ProcessMessage(queueMessage);
            }
            catch (Exception ex)
            {
                _log.Error(ex);

                MoveMessageToErrorQueue(queueMessage.Id);
            }
        }
    }

    private void ProcessMessage(QueueMessage message)
    {
        using (var dependencyScope = _container.BeginLifetimeScope())
        {
            var unitOfWork = dependencyScope.Resolve<IUnitOfWork>();

            unitOfWork.Start();

            var messageProcessor = new MessageProcessor(dependencyScope);

            try
            {
                messageProcessor.HandleMessage(message);

                _queueProvider.ReceiveById(_configuration.QueueName, message.Id);
            }
            catch (Exception ex)
            {
                _log.Error(ex);

                unitOfWork.CompletedWithFail();

                throw;
            }

            unitOfWork.CompletedWithSuccess();
        }
    }

    private void MoveMessageToErrorQueue(string messageId)
    {
        try
        {
            using (var dependencyScope = _container.BeginLifetimeScope())
            {
                var unitOfWork = dependencyScope.Resolve<IUnitOfWork>();

                unitOfWork.Start();

                var message = _queueProvider.ReceiveById(_configuration.QueueName, messageId);

                try
                {
                    _queueProvider.QueueMessage(message.Body, message.Name, _configuration.QueueErrorName);

                    unitOfWork.CompletedWithSuccess();
                }
                catch
                {
                    unitOfWork.CompletedWithFail();

                    throw;
                }
            }
        }
        catch (Exception ex)
        {
            _log.Error(ex);
        }
    }
}
 

В принципе, моя идея проста, по крайней мере, на бумаге. Сообщения отлично берутся из очереди, и на компьютере разработчика все работает нормально, проблема в том, что мы развертываем код на нашем сервере (Windows 2008). Если сообщение обрабатывается неправильно, мы удаляем сообщение из очереди и помещаем его в очередь ошибок, проблема в том, что метод GetById не может найти сообщение:

 private void MoveMessageToErrorQueue(string messageId)

var message = _queueProvider.ReceiveById(_configuration.QueueName, messageId);
 

Он отлично работает на блоках разработки, но мы просто можем найти способ исправить это.

Любая помощь приветствуется.

Спасибо

Обновить

Следуя комментарию Пола:

Привет, Пол, спасибо за помощь. К сожалению, если я не понял что-то неправильно, даже не начну получать. Прямо сейчас я выбираю сообщение из очереди, и поскольку из этой очереди читает только один поток, если позже получит сообщение по идентификатору, кажется логичным, чтобы сообщение все еще было там. Итак, почему я думаю, что мне нужно делать то, что я делаю. Я просматриваю сообщение, затем создаю область транзакции и выполняю любую обработку, и, конечно, любые сеансы sql server, созданные во время выполнения, зарегистрируют эту транзакцию. Если что-то пойдет не так во время обработки сообщения, мне нужно откатить изменения, внесенные в базу данных, и для этого я откатываю транзакцию, но мне также нужно поместить сообщение об ошибке в очередь ошибок. Я не могу выполнить только одну транзакцию, удалить сообщение из очереди, попытаться обработать сообщение и, если оно завершится неудачей, поместить его в очередь ошибок, потому что мне все еще нужно откатить изменения базы данных.

Комментарии:

1. итак, вы получаете сообщение на рабочей машине, но не можете найти его позже? или он вообще не поступает?

2. Привет по умолчанию. Сообщение есть, оно должно быть. Если во время обработки сообщения генерируется исключение, область транзакции откатывается и создается новая транзакция для перемещения этого сообщения из одной очереди в другую. Идентификатор сообщения — это идентификатор сообщения, которое было передано непосредственно перед обработкой, поэтому сообщение должно быть там, или, по крайней мере, это было то, что я думал, в конце концов, peek просто извлекает копию сообщения.

Ответ №1:

Вместо того, чтобы просматривать, а затем пытаться получить сообщение по идентификатору и т. Д., Почему бы просто не использовать BeginReceive / Receive или аналогичный — таким образом, у вас уже есть сообщение.

Просто убедитесь, что вы настроили свойства очереди так, чтобы они включали тело сообщения (не могу вспомнить, используется ли это по умолчанию для получения и т. Д.)

MSDN — MessageQueue.Метод BeginReceive
MSDN — MessageQueue.Метод получения

(Редактировать …)

Если вы хотите улучшить видимость своих сообщений и т. Д., Попробуйте загрузить MSMQ Inspector (и да, это мой инструмент, я создал его для подобных сценариев, т.Е. Что происходит ?!)

Если вы используете его, включите «режим постоянного подглядывания», и вы должны увидеть, как идут сеансы массажа. Я бы также включил журнал для очередей, которые вы используете, чтобы при обработке сообщений вы видели их в журнале и т. Д. Возможно, Дважды проверьте, что идентификаторы сообщений не изменяются в те моменты, когда вы не ожидаете и т. Д. Кроме того, разное время выполнения и настройки ОС и MSMQ могут вызывать разное поведение… Трудно сказать, не запустив код (который, судя по тому, что вы опубликовали, мы, вероятно, могли бы !!)

PK 🙂

Комментарии:

1. Привет, Пол, спасибо за вашу помощь, я обновил свой вопрос.