Могут ли части этого кода вызвать исключение SocketException, создаваемое NetMQ?

#f# #netmq

Вопрос:

Я создал простую систему связи на основе NetMQ для распределенного приложения. У меня есть Hub несколько Node , через которые можно передавать сообщения, и Bus несколько BusNode , которые можно использовать для публикации/подписки.

У меня была проблема, которая всплывала очень редко, и я надеюсь, что сегодня я ее исправил. Приложения просто взрывались несколько раз в неделю, и в журнале событий Windows было опубликовано исключение. По-видимому, у меня нет возможности выяснить, какая часть моего источника вызывает это. До сих пор мне не удавалось воспроизвести эту проблему на своих машинах разработки. Извините, у меня есть только этот скриншот, без текста.

Журнал сбоев

Я предполагаю, что решение заключается в использовании NetMQQueue, потому что оказалось, что сообщения отправлялись из нескольких потоков, а я думал, что нет. Поэтому я реализовал использование NetMQQueue в Node и BusNode , где это было просто и очевидно.

Однако есть еще два места, где я не уверен, является ли материал потокобезопасным, или мне нужно что-то исправить. Это входит Hub и входит Bus . Вот как выглядят эти двое.

Мой вопрос в следующем: эти двое в здравом уме, или проблема с потоковой передачей также представляет опасность здесь?

Хаб

 type Hub(connectionStrings: string list) =
    let poller = new NetMQPoller()
    let router = new RouterSocket()
    do
        router.ReceiveReady.Add (fun x ->
            let message = router.ReceiveMultipartMessage()
            if message.FrameCount = 4 then
                // incoming is 0:source 1:destination 2:timestamp 3:msg
                let m = NetMQMessage()
                m.Append message.[1] // destination (for routing - will be removed by router)
                m.Append message.[0] // source
                m.Append message.[1] // destination
                m.Append message.[2] // timestamp
                m.Append message.[3] // msg
                router.SendMultipartMessage m
            else
                log.Warning "Unexpected frame size in Hub."
        )
        connectionStrings |> List.iter router.Bind
        poller.Add router
        poller.RunAsync()
    member _.Stop() =
        poller.Stop()
    interface IDisposable with
        member _.Dispose() =
            if not poller.IsDisposed then
                poller.Stop()
                poller.RemoveAndDispose router
                dispose poller
 

Bus

 type Bus(xpubs: string list, xsubs: string list) =
    let forward (inSocket: NetMQSocket) (outSocket: NetMQSocket) =
        let mutable more = inSocket.HasIn
        while more do
            let mutable msg = new Msg()
            msg.InitEmpty()
            inSocket.Receive amp;msg
            more <- msg.HasMore
            outSocket.Send (amp;msg, more)
    let poller = new NetMQPoller()
    let xpub = new XPublisherSocket()
    let xsub = new XSubscriberSocket()
    do
        xpubs |> List.iter xpub.Bind
        xsubs |> List.iter xsub.Bind
        xpub.ReceiveReady.Add (fun x -> forward xpub xsub)
        xsub.ReceiveReady.Add (fun x -> forward xsub xpub)
        poller.Add xpub
        poller.Add xsub
        poller.RunAsync()
    member _.Stop() =
        poller.Stop()
    interface IDisposable with
        member _.Dispose() =
            if not poller.IsDisposed then
                poller.Stop()
                poller.RemoveAndDispose xpub
                poller.RemoveAndDispose xsub
                dispose poller
 

Just for completeness, I am also showing you Node and BusNode, which now uses NetMQQueue that I assume will fix the problem at least here.

Node

 type Node(nodeId: NodeId, connectionString: string) =
    let receiveEvent = new Event<NodeReceiveEventArgs>()
    let queue = new NetMQQueue<NetMQMessage>(0)
    let dealer = new DealerSocket()
    let poller = new NetMQPoller()
    do
        queue.ReceiveReady.Add (fun x ->
            let mutable isDone = false
            while not isDone do
                let mutable m: NetMQMessage = null
                if x.Queue.TryDequeue(amp;m, TimeSpan.Zero) then
                    dealer.SendMultipartMessage m
                else
                    isDone <- true
        )
        dealer.Options.Identity <- NodeId.crack nodeId
        dealer.ReceiveReady.Add (fun x ->
            let m = dealer.ReceiveMultipartMessage()
            match messageToArgs m with
            | Ok args ->
                // log.Debug $"Rcv ({args.Source.asText} → {nodeId.asText}) {Misc.getDuCaseName args.Message}"
                receiveEvent.Trigger args
            | Error s -> log.Warning $"Rcv : {s}"
        )
        dealer.Connect connectionString
        poller.Add queue
        poller.Add dealer
        poller.RunAsync()
    interface IDisposable with
        member _.Dispose() =
            if not poller.IsDisposed then
                poller.Stop()
                poller.RemoveAndDispose dealer
                poller.RemoveAndDispose queue
                dispose poller
    member _.OnReceive = receiveEvent.Publish
    member _.Send(destination: NodeId, message: IMessage) =
        // log.Debug $"Snd ({nodeId.asText} → {destination.asText}) {Misc.getDuCaseName message}"
        let m = createMqMessage destination DateTime.Now message
        queue.Enqueue m
 

Узел шины

 type BusNode(nodeId: NodeId, xpub: string, xsub: string) =
    let receiveEvent = new Event<BusNodeReceiveEventArgs>()
    let queue = new NetMQQueue<NetMQMessage>(0)
    let pub = new PublisherSocket()
    let sub = new SubscriberSocket()
    let poller = new NetMQPoller()
    do
        queue.ReceiveReady.Add (fun x ->
            let mutable isDone = false
            while not isDone do
                let mutable m: NetMQMessage = null
                if x.Queue.TryDequeue(amp;m, TimeSpan.Zero) then
                    pub.SendMultipartMessage m
                else
                    isDone <- true
        )
        pub.Options.Identity <- NodeId.crack nodeId
        pub.Connect xsub
        sub.ReceiveReady.Add (fun x ->
            let m = sub.ReceiveMultipartMessage()
            if m.FrameCount = 4 then
                let topic = m.[0].ConvertToString(Text.Encoding.ASCII) |> Topic.create
                let source: NodeId = m.[1].ToByteArray() |> NodeId.pack
                let timeStamp: DateTime = m.[2].ToByteArray() |> ZeroSerialization.deserializeDateTime
                let broadcast = m.[3].ToByteArray() |> ZeroSerialization.deserializeBroadcast
                let args = BusNodeReceiveEventArgs(topic, source, nodeId, timeStamp, broadcast)
                log.Debug $"Bus ({args.Source.asText} → {args.Destination.asText}) [{topic.asText}] {Misc.getDuCaseName broadcast}"
                receiveEvent.Trigger args
            else
                log.Warning $"Bus ({nodeId.asText}) : Unexpected frame size."
        )
        sub.Connect xpub
        poller.Add queue
        poller.Add sub
        poller.RunAsync()
    interface IDisposable with
        member _.Dispose() =
            if not poller.IsDisposed then
                poller.Stop()
                poller.RemoveAndDispose sub
                poller.RemoveAndDispose pub
                poller.RemoveAndDispose queue
                dispose poller
            ()
    interface IBusNode with
        member _.OnReceive = receiveEvent.Publish
        member _.Publish(topic: Topic, broadcast: IBroadcast) =
            let m = NetMQMessage()
            m.Append (Topic.crack topic)
            m.Append (NodeId.crack nodeId)
            m.Append (ZeroSerialization.serializeDateTime DateTime.Now)
            m.Append (ZeroSerialization.serializeBroadcast broadcast)
            log.Debug $"Bus ({nodeId.asText} →) {Misc.getDuCaseName broadcast}"
            queue.Enqueue m
            ()
        member _.Subscribe (topic: Topic) =
            log.Debug $"Bus ({nodeId.asText}) subscribe [{topic.asText}]"
            Topic.crack topic |> sub.Subscribe
        member _.SubscribeAll () =
            log.Debug $"Bus ({nodeId.asText}) subscribe all"
            sub.SubscribeToAnyTopic ()
 

Комментарии:

1. Описанная проблема не возникла после того, как я опубликовал этот вопрос. Протокол используется с высокой нагрузкой и обладает высокой производительностью.