#f# #netmq
Вопрос:
Я создал простую систему связи на основе NetMQ для распределенного приложения. У меня есть Hub
несколько Node
, через которые можно передавать сообщения, и Bus
несколько BusNode
, которые можно использовать для публикации/подписки.
У меня была проблема, которая всплывала очень редко, и я надеюсь, что сегодня я ее исправил. Приложения просто взрывались несколько раз в неделю, и в журнале событий Windows было опубликовано исключение. По-видимому, у меня нет возможности выяснить, какая часть моего источника вызывает это. До сих пор мне не удавалось воспроизвести эту проблему на своих машинах разработки. Извините, у меня есть только этот скриншот, без текста.
Я предполагаю, что решение заключается в использовании NetMQQueue, потому что оказалось, что сообщения отправлялись из нескольких потоков, а я думал, что нет. Поэтому я реализовал использование NetMQQueue в Node
и BusNode
, где это было просто и очевидно.
Однако есть еще два места, где я не уверен, является ли материал потокобезопасным, или мне нужно что-то исправить. Это входит Hub
и входит Bus
. Вот как выглядят эти двое.
Мой вопрос в следующем: эти двое в здравом уме, или проблема с потоковой передачей также представляет опасность здесь?
Хаб
type Hub(connectionStrings: string list) =
let poller = new NetMQPoller()
let router = new RouterSocket()
do
router.ReceiveReady.Add (fun x ->
let message = router.ReceiveMultipartMessage()
if message.FrameCount = 4 then
// incoming is 0:source 1:destination 2:timestamp 3:msg
let m = NetMQMessage()
m.Append message.[1] // destination (for routing - will be removed by router)
m.Append message.[0] // source
m.Append message.[1] // destination
m.Append message.[2] // timestamp
m.Append message.[3] // msg
router.SendMultipartMessage m
else
log.Warning "Unexpected frame size in Hub."
)
connectionStrings |> List.iter router.Bind
poller.Add router
poller.RunAsync()
member _.Stop() =
poller.Stop()
interface IDisposable with
member _.Dispose() =
if not poller.IsDisposed then
poller.Stop()
poller.RemoveAndDispose router
dispose poller
Bus
type Bus(xpubs: string list, xsubs: string list) =
let forward (inSocket: NetMQSocket) (outSocket: NetMQSocket) =
let mutable more = inSocket.HasIn
while more do
let mutable msg = new Msg()
msg.InitEmpty()
inSocket.Receive amp;msg
more <- msg.HasMore
outSocket.Send (amp;msg, more)
let poller = new NetMQPoller()
let xpub = new XPublisherSocket()
let xsub = new XSubscriberSocket()
do
xpubs |> List.iter xpub.Bind
xsubs |> List.iter xsub.Bind
xpub.ReceiveReady.Add (fun x -> forward xpub xsub)
xsub.ReceiveReady.Add (fun x -> forward xsub xpub)
poller.Add xpub
poller.Add xsub
poller.RunAsync()
member _.Stop() =
poller.Stop()
interface IDisposable with
member _.Dispose() =
if not poller.IsDisposed then
poller.Stop()
poller.RemoveAndDispose xpub
poller.RemoveAndDispose xsub
dispose poller
Just for completeness, I am also showing you Node and BusNode, which now uses NetMQQueue that I assume will fix the problem at least here.
Node
type Node(nodeId: NodeId, connectionString: string) =
let receiveEvent = new Event<NodeReceiveEventArgs>()
let queue = new NetMQQueue<NetMQMessage>(0)
let dealer = new DealerSocket()
let poller = new NetMQPoller()
do
queue.ReceiveReady.Add (fun x ->
let mutable isDone = false
while not isDone do
let mutable m: NetMQMessage = null
if x.Queue.TryDequeue(amp;m, TimeSpan.Zero) then
dealer.SendMultipartMessage m
else
isDone <- true
)
dealer.Options.Identity <- NodeId.crack nodeId
dealer.ReceiveReady.Add (fun x ->
let m = dealer.ReceiveMultipartMessage()
match messageToArgs m with
| Ok args ->
// log.Debug $"Rcv ({args.Source.asText} → {nodeId.asText}) {Misc.getDuCaseName args.Message}"
receiveEvent.Trigger args
| Error s -> log.Warning $"Rcv : {s}"
)
dealer.Connect connectionString
poller.Add queue
poller.Add dealer
poller.RunAsync()
interface IDisposable with
member _.Dispose() =
if not poller.IsDisposed then
poller.Stop()
poller.RemoveAndDispose dealer
poller.RemoveAndDispose queue
dispose poller
member _.OnReceive = receiveEvent.Publish
member _.Send(destination: NodeId, message: IMessage) =
// log.Debug $"Snd ({nodeId.asText} → {destination.asText}) {Misc.getDuCaseName message}"
let m = createMqMessage destination DateTime.Now message
queue.Enqueue m
Узел шины
type BusNode(nodeId: NodeId, xpub: string, xsub: string) =
let receiveEvent = new Event<BusNodeReceiveEventArgs>()
let queue = new NetMQQueue<NetMQMessage>(0)
let pub = new PublisherSocket()
let sub = new SubscriberSocket()
let poller = new NetMQPoller()
do
queue.ReceiveReady.Add (fun x ->
let mutable isDone = false
while not isDone do
let mutable m: NetMQMessage = null
if x.Queue.TryDequeue(amp;m, TimeSpan.Zero) then
pub.SendMultipartMessage m
else
isDone <- true
)
pub.Options.Identity <- NodeId.crack nodeId
pub.Connect xsub
sub.ReceiveReady.Add (fun x ->
let m = sub.ReceiveMultipartMessage()
if m.FrameCount = 4 then
let topic = m.[0].ConvertToString(Text.Encoding.ASCII) |> Topic.create
let source: NodeId = m.[1].ToByteArray() |> NodeId.pack
let timeStamp: DateTime = m.[2].ToByteArray() |> ZeroSerialization.deserializeDateTime
let broadcast = m.[3].ToByteArray() |> ZeroSerialization.deserializeBroadcast
let args = BusNodeReceiveEventArgs(topic, source, nodeId, timeStamp, broadcast)
log.Debug $"Bus ({args.Source.asText} → {args.Destination.asText}) [{topic.asText}] {Misc.getDuCaseName broadcast}"
receiveEvent.Trigger args
else
log.Warning $"Bus ({nodeId.asText}) : Unexpected frame size."
)
sub.Connect xpub
poller.Add queue
poller.Add sub
poller.RunAsync()
interface IDisposable with
member _.Dispose() =
if not poller.IsDisposed then
poller.Stop()
poller.RemoveAndDispose sub
poller.RemoveAndDispose pub
poller.RemoveAndDispose queue
dispose poller
()
interface IBusNode with
member _.OnReceive = receiveEvent.Publish
member _.Publish(topic: Topic, broadcast: IBroadcast) =
let m = NetMQMessage()
m.Append (Topic.crack topic)
m.Append (NodeId.crack nodeId)
m.Append (ZeroSerialization.serializeDateTime DateTime.Now)
m.Append (ZeroSerialization.serializeBroadcast broadcast)
log.Debug $"Bus ({nodeId.asText} →) {Misc.getDuCaseName broadcast}"
queue.Enqueue m
()
member _.Subscribe (topic: Topic) =
log.Debug $"Bus ({nodeId.asText}) subscribe [{topic.asText}]"
Topic.crack topic |> sub.Subscribe
member _.SubscribeAll () =
log.Debug $"Bus ({nodeId.asText}) subscribe all"
sub.SubscribeToAnyTopic ()
Комментарии:
1. Описанная проблема не возникла после того, как я опубликовал этот вопрос. Протокол используется с высокой нагрузкой и обладает высокой производительностью.