Многопоточность: путаница между потоком издателя и потоком подписчика

#c# #mqtt #opc

#c# #mqtt #opc

Вопрос:

Я создал приложение Windows forms, которое может публиковать теги OPC в брокере MQTT, теперь я пытаюсь сделать обратное, записать теги MQTT на OPC-сервер. когда я запустил как агент (издатель), так и передачу (подписчик), два потока выполняют ту же работу, что и публикация, я не знаю, в чем проблема. Метод запуска приведен ниже :

 public byte Start()
    {
        try
        {
            byte connectResu<
            if (IsLWT == false)
            {


                connectResult = this._MqttClient.Connect(ClientID, Username, Password, IsCleanSession, KeepAlivePeriode);
            }
            else
            {             
                   
                connectResult = this._MqttClient.Connect(ClientID, Username, Password, willRetain, willQos, true, willTopic, willMessage, IsCleanSession, KeepAlivePeriode);

            }

            // 0 means that the connection suceeded
            if (connectResult == 0)
            {
                this.Rate = GetRateFromOPCGroups();
                this._publisherThread = new Thread(() => Publish());
                this._publisherThread.IsBackground = true;
                this._publisherThread.Start();
                IsStarted = true;
            }

            if (connectResult == 0)
            {
              //this.Rate = GetRateFromOPCGroups();
                this._SubscriberThread = new Thread(() => Subscribe(topics));
                this._SubscriberThread.IsBackground = true;
                this._SubscriberThread.Start();
                IsStarted = true;
            }

            return connectResu<

        }
        catch (IntegrationObjects.Networking.M2Mqtt.Exceptions.MqttClientException ex)
        {
            MQTTServiceLogger.TraceLog(MessageType.Error, MQTTServiceMessages.startAgentFailed(this.Name,ex.Message));
            return 11;
        }
        
        catch (IntegrationObjects.Networking.M2Mqtt.Exceptions.MqttCommunicationException ex)
        {
            MQTTServiceLogger.TraceLog(MessageType.Error, MQTTServiceMessages.startAgentFailed(this.Name, ex.Message));
            return 11;
        }
        catch (Exception ex)
        {
            MQTTServiceLogger.TraceLog(MessageType.Error, MQTTServiceMessages.startAgentFailed(this.Name, ex.Message));
            return 11;
        }
    }
  

1. Это код публикации:
private void Publish()
{

         while (true)
        {
            if (IsStarted)
            {
                try
                {
                    if (_MqttClient.IsConnected)
                    {
                        isConnected = true;
                        if (this.OPCItems.Count != 0)
                        {

                            JsonMQTTMessage JsonMessage = new JsonMQTTMessage();
                            JsonMessage.Timestamp = DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss.fff");
                            JsonMessage.ListofValues = new List<UpdatedOPCItem>();

                          
                        
                            lock (lockOPCItems)
                            {
                                foreach (OPCItem Item in OPCItems.ToList())
                                {
                                    if (Item != null)
                                    {
                                        UpdatedOPCItem upItem = new UpdatedOPCItem();
                                        upItem.ID = Item.ItemID;
                                        upItem.value = Item.ItemCurrentValue;
                                        upItem.quality = Item.ItemQuality;
                                        upItem.timestamp = Item.ItemTimeStamp.ToString("yyyy/MM/dd HH:mm:ss.fff");
                                        upItem.DataType = Item.ItemDataType;
                                        JsonMessage.ListofValues.Add(upItem);
                                    }

                                }
                            }
                           
                            var messageTopublish = Newtonsoft.Json.JsonConvert.SerializeObject(JsonMessage);
                            ushort res = _MqttClient.Publish(Topic, Encoding.UTF8.GetBytes(messageTopublish), Qos, Retain);
                            ResetValues();
                            Thread.Sleep(Rate);
  
  1. Это код подписки:

public void Subscribe (Список тем) {

         while (true)
        {
            if (IsStarted)
            {
                try
                {
                    if (_MqttClient.IsConnected)
                    {
                        isConnected = true;
                        foreach (string topic in topics)
                        {
                            ushort msggId = _MqttClient.Subscribe(new string[] { $"{ topic }" },

                           new byte[] { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE });
                        }
                        Thread.Sleep(Rate);

                    }
                    else
                   
  

Комментарии:

1. Во-первых: это намеренно, что у вас есть только один IsStarted для обоих? Второе: пожалуйста, добавьте Publish() и Subscribe(topics) реализации.

2. Что означает «IsLWT»?

3. while (true) { if (IsStarted) — это плохая идея, но, вероятно, не ваша проблема. Как вы подтверждаете, что они оба «выполняют одну и ту же работу»?

4. Когда я нажал на кнопку запуска подписчика (Передача) и проверил другим клиентом MQTT, я увидел, что новые сообщения публикуются в брокере MQTT. то же самое происходит, когда я нажимаю на кнопку запуска издателя (агента).

5. Хорошо, я попытаюсь решить проблему таким образом. Большое спасибо.