#asynchronous #f# #mailboxprocessor
#асинхронный #f# #mailboxprocessor
Вопрос:
У меня есть команда, периодически выполняющая проверку SFTP и записывающая результат в файл.
let logPath = Path.Combine(config.["SharedFolder"],timestamp)
let sw = new StreamWriter(logPath,true)
//...
[<EntryPoint>]
let main argv =
try
sftpExample config.["SharedFolder"] config.["SFTPFolder"] 22 "usr" "pswd" |> ignore
with
| ex ->
ex.Message |> printerAgent.Post
printfn "%s" ex.Message // <- NOTICE THIS LINE
sw.Close()
sw.Dispose()
0
Цикл выполняется по MailboxProcessor
let printerAgent = MailboxProcessor.Start(fun inbox->
// the message processing function
let rec messageLoop() = async{
// read a message
let! msg = inbox.Receive()
// process a message
sw.WriteLine("{0}: {1}", DateTime.UtcNow.ToShortTimeString(), msg)
printfn "%s" msg
// loop to top
return! messageLoop()
}
// start the loop
messageLoop()
)
который вызывается для записи сообщений в журнал
let sftpExample local host port username (password:string) =
async {
use client = new SftpClient(host, port, username, password)
client.Connect()
sprintf "Connected to %snroot dir list" host |> printerAgent.Post
do! downloadDir local client ""
sprintf "Done, disconnecting now" |> printerAgent.Post
client.Disconnect()
} |> Async.RunSynchronously
Загрузка файлов происходит асинхронно, как и соответствующие сообщения, но, похоже, все работает хорошо.
Проблема в том, что если по каким-либо причинам sftp-соединение немедленно завершается сбоем, у MailboxProcessor
нет времени для регистрации сообщения об исключении.
То, что я пытался сделать — что действительно работает — добавляло printfn "%s" ex.Message
перед концом: я просто хотел узнать, предлагает ли кто-нибудь лучшее решение.
К вашему сведению, полный код находится в этой сути.
Ответ №1:
Фактически, вы хотите, чтобы программа подождала, пока MailboxProcessor закончит обработку всей своей очереди сообщений, прежде чем программа завершит работу. Ваш printfn "%s" ex.Message
, кажется, работает, но это не гарантировано: если в очереди MailboxProcessor было несколько элементов, поток, выполняющий printfn
функцию, может завершиться до того, как поток MailboxProcessor успеет просмотреть все свои сообщения.
Дизайн, который я бы рекомендовал, — изменить входные данные вашего printerAgent
на DU, как показано ниже:
type printerAgentMsg =
| Message of string
| Shutdown
Затем, когда вы хотите, чтобы агент принтера завершил отправку своих сообщений, используйте MailboxProcessor.PostAndReply
(и обратите внимание на пример использования в документах) main
функцию и отправьте ей Shutdown
сообщение. Помните, что сообщения MailboxProcessor находятся в очереди: к тому времени, когда он получит Shutdown
сообщение, он уже обработает остальные сообщения в очереди. Итак, все, что ему нужно сделать для обработки Shutdown
сообщения, это вернуть unit
ответ и просто не вызывать его цикл снова. И поскольку вы использовали PostAndReply
вместо PostAndReplyAsync
, основная функция будет блокироваться до тех пор, пока MailboxProcessor не закончит выполнять всю свою работу. (Чтобы избежать какой-либо возможности блокировки навсегда, я бы рекомендовал установить тайм-аут, подобный 10 секундам в вашем PostAndReply
вызове; тайм-аут по умолчанию равен -1, что означает ожидание вечно).
РЕДАКТИРОВАТЬ: Вот пример (НЕ тестировался, используйте на свой страх и риск) того, что я имею в виду:
type printerAgentMsg =
| Message of string
| Shutdown of AsyncReplyChannel<unit>
let printerAgent = MailboxProcessor.Start(fun inbox->
// the message processing function
let rec messageLoop() = async{
// read a message
let! msg = inbox.Receive()
// process a message
match msg with
| Message text ->
sw.WriteLine("{0}: {1}", DateTime.UtcNow.ToShortTimeString(), text)
printfn "%s" text
// loop to top
return! messageLoop()
| Shutdown replyChannel ->
replyChannel.Reply()
// We do NOT do return! messageLoop() here
}
// start the loop
messageLoop()
)
let logPath = Path.Combine(config.["SharedFolder"],timestamp)
let sw = new StreamWriter(logPath,true)
//...
[<EntryPoint>]
let main argv =
try
sftpExample config.["SharedFolder"] config.["SFTPFolder"] 22 "usr" "pswd" |> ignore
with
| ex ->
ex.Message |> Message |> printerAgent.Post
printfn "%s" ex.Message // <- NOTICE THIS LINE
printerAgent.PostAndReply( (fun replyChannel -> Shutdown replyChannel), 10000) // Timeout = 10000 ms = 10 seconds
sw.Close()
sw.Dispose()
Ответ №2:
Самым простым решением было бы использовать обычную (синхронную) функцию для ведения журнала вместо MailboxProcessor или использовать какую-либо платформу ведения журнала и сбросить регистраторы в конце основной функции. Если вы хотите продолжать использовать printingAgent
, вы можете реализовать «синхронный» режим следующим образом:
type Msg =
| Log of string
| LogAndWait of string * AsyncReplyChannel<unit>
let printerAgent = MailboxProcessor.Start(fun inbox ->
let processLogMessage logMessage =
sw.WriteLine("{0}: {1}", DateTime.UtcNow.ToShortTimeString(), logMessage)
printfn "%s" logMessage
let rec messageLoop() = async{
let! msg = inbox.Receive()
match msg with
| Log logMessage ->
processLogMessage logMessage
| LogAndWait (logMessage, replyChannel) ->
processLogMessage logMessage
replyChannel.Reply()
return! messageLoop()
}
messageLoop()
)
Который вы затем использовали бы либо асинхронно
printerAgent.Post(Log "Message")
или синхронно
printerAgent.PostAndReply(fun channel -> LogAndWait("Message", channel))
Вам следует использовать синхронную альтернативу при регистрации исключения в функции main.