удаление определенных сообщений из очереди ActiveMQ с помощью селектора не позволяет удалить все, что должно соответствовать селектору

#activemq #message-queue #messaging #solace #solace-mq

Вопрос:

Я написал утилиту C# для использования/удаления определенных сообщений в очереди ActiveMQ. Этой утилите не удается удалить все сообщения, соответствующие селектору. Есть ли проблема с ActiveMQ, или (что более вероятно) Я что-то делаю неправильно? Аналогичная утилита, работающая в Solace, не обнаруживает той же проблемы.

Производитель сообщений добавляет идентифицирующее свойство в msg; утилита будет использовать сообщения, в которых свойство имеет определенное значение, с помощью селектора. Удаляемые сообщения относятся к объектам приложения, которые пользователи решили отменить.

Когда количество сообщений в очереди невелико, например 100, утилита очистки очереди работает по плану. Когда количество сообщений велико, например 10000, утилита удалит/удалит первые 50% тех сообщений, которые следует удалить, оставив большое количество сообщений, которые должны были быть удалены, но не были.

Два приведенных ниже примера кода взяты из сценариев linqpad, которые демонстрируют проблему. Первый создает 10000 сообщений, помеченных случайным образом одной из 10 строк (от «аааааааа» до «jjjjjjjj»). Второй блок кода пытается удалить эти сообщения с пометкой «ааааааааа». Первый блок сгенерирует около 1000 сообщений «ааааааааа», но второй блок сгенерирует только около 50.

(Я использую версию ActiveMQ: 5.16.3 в Windows, сценарии linqpad ссылаются на Apache.NMS.ActiveMQ.NETCore версии 1.7.3, работающую на .Net 5.0)

продюсер.linq

 string queueName = "queue.1";

var rnd = new System.Random();
var tags = new[] {
    "aaaaaaaa", "bbbbbbbb", "cccccccc", "dddddddd", "eeeeeeee", 
    "ffffffff", "gggggggg", "hhhhhhhh", "iiiiiiii", "jjjjjjjj"};

Uri uri = new Uri("activemq:tcp://localhost:61616");

var connectionFactory = new Apache.NMS.ActiveMQ.ConnectionFactory(uri);
using IConnection connection = connectionFactory.CreateConnection();
using ISession session = connection.CreateSession( AcknowledgementMode.AutoAcknowledge);
using IDestination destination = session.GetQueue(queueName);

connection.Start();

using IMessageProducer producer = session.CreateProducer(destination);
producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
producer.Priority = MsgPriority.Normal;
producer.RequestTimeout = TimeSpan.FromSeconds(1000.0);

int numDrainTargets = 0;
int numMsgsToSend = 10000;

for( int ctr = 0; ctr < numMsgsToSend;   ctr)
{
    var msg = producer.CreateTextMessage();
    msg.Text = $"msg: {ctr,4}";
    int tagIndex = rnd.Next(tags.Length);
    if (tagIndex == 0)
          numDrainTargets;
    msg.Properties.SetString("MyKey", tags[tagIndex]);
    producer.Send(msg);
}   

numDrainTargets.Dump("numDrainTargets");

"exiting producer".Dump();
 

дренер.linq

 
string queueName = "queue.1";
Uri uri = new Uri("activemq:tcp://localhost:61616");
IConnectionFactory factory = new Apache.NMS.ActiveMQ.ConnectionFactory(uri);
using IConnection connection = factory.CreateConnection();
using ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
using IDestination destination = session.GetQueue(queueName);
connection.Start();

string selector = $"MyKey = 'aaaaaaaa'";
using IMessageConsumer consumer = session.CreateConsumer(destination, selector);

var waitTime = TimeSpan.FromSeconds(0.1);
int drainedCtr = 0;
IMessage msg;
while((msg = consumer.Receive(waitTime)) != null)
{
      drainedCtr;
}

drainedCtr.Dump("num drained");