Первый цикл MailboxProcessor не может быть запущен, если программа немедленно завершается сбоем

#asynchronous #f# #mailboxprocessor

#асинхронный #f# #mailboxprocessor

Вопрос:

У меня есть команда, периодически выполняющая проверку SFTP и записывающая результат в файл.

 let logPath = Path.Combine(config.["SharedFolder"],timestamp)
let sw = new StreamWriter(logPath,true)
//...
[<EntryPoint>]
let main argv = 
    try 
        sftpExample config.["SharedFolder"] config.["SFTPFolder"] 22 "usr" "pswd" |> ignore
    with 
    | ex -> 
        ex.Message |> printerAgent.Post
        printfn "%s" ex.Message // <- NOTICE THIS LINE
    sw.Close()
    sw.Dispose()
0  
  

Цикл выполняется по MailboxProcessor

 let printerAgent = MailboxProcessor.Start(fun inbox-> 
    // the message processing function
    let rec messageLoop() = async{        
        // read a message
        let! msg = inbox.Receive()
        // process a message
        sw.WriteLine("{0}: {1}", DateTime.UtcNow.ToShortTimeString(), msg)
        printfn "%s" msg
        // loop to top
        return! messageLoop()  
        }
    // start the loop 
    messageLoop() 
    )
  

который вызывается для записи сообщений в журнал

 let sftpExample local host port username (password:string) =
    async {
        use client = new SftpClient(host, port, username, password)
        client.Connect()
        sprintf "Connected to %snroot dir list" host  |> printerAgent.Post
        do! downloadDir local client ""   
        sprintf "Done, disconnecting now" |> printerAgent.Post
        client.Disconnect()
    } |> Async.RunSynchronously
  

Загрузка файлов происходит асинхронно, как и соответствующие сообщения, но, похоже, все работает хорошо.

Проблема в том, что если по каким-либо причинам sftp-соединение немедленно завершается сбоем, у MailboxProcessor нет времени для регистрации сообщения об исключении.

То, что я пытался сделать — что действительно работает — добавляло printfn "%s" ex.Message перед концом: я просто хотел узнать, предлагает ли кто-нибудь лучшее решение.

К вашему сведению, полный код находится в этой сути.

Ответ №1:

Фактически, вы хотите, чтобы программа подождала, пока MailboxProcessor закончит обработку всей своей очереди сообщений, прежде чем программа завершит работу. Ваш printfn "%s" ex.Message , кажется, работает, но это не гарантировано: если в очереди MailboxProcessor было несколько элементов, поток, выполняющий printfn функцию, может завершиться до того, как поток MailboxProcessor успеет просмотреть все свои сообщения.

Дизайн, который я бы рекомендовал, — изменить входные данные вашего printerAgent на DU, как показано ниже:

 type printerAgentMsg =
    | Message of string
    | Shutdown
  

Затем, когда вы хотите, чтобы агент принтера завершил отправку своих сообщений, используйте MailboxProcessor.PostAndReply (и обратите внимание на пример использования в документах) main функцию и отправьте ей Shutdown сообщение. Помните, что сообщения MailboxProcessor находятся в очереди: к тому времени, когда он получит Shutdown сообщение, он уже обработает остальные сообщения в очереди. Итак, все, что ему нужно сделать для обработки Shutdown сообщения, это вернуть unit ответ и просто не вызывать его цикл снова. И поскольку вы использовали PostAndReply вместо PostAndReplyAsync , основная функция будет блокироваться до тех пор, пока MailboxProcessor не закончит выполнять всю свою работу. (Чтобы избежать какой-либо возможности блокировки навсегда, я бы рекомендовал установить тайм-аут, подобный 10 секундам в вашем PostAndReply вызове; тайм-аут по умолчанию равен -1, что означает ожидание вечно).

РЕДАКТИРОВАТЬ: Вот пример (НЕ тестировался, используйте на свой страх и риск) того, что я имею в виду:

 type printerAgentMsg =
    | Message of string
    | Shutdown of AsyncReplyChannel<unit>

let printerAgent = MailboxProcessor.Start(fun inbox-> 
    // the message processing function
    let rec messageLoop() = async{        
        // read a message
        let! msg = inbox.Receive()
        // process a message
        match msg with
        | Message text ->
            sw.WriteLine("{0}: {1}", DateTime.UtcNow.ToShortTimeString(), text)
            printfn "%s" text
            // loop to top
            return! messageLoop()
        | Shutdown replyChannel ->
            replyChannel.Reply()
            // We do NOT do return! messageLoop() here
        }
    // start the loop 
    messageLoop() 
    )

let logPath = Path.Combine(config.["SharedFolder"],timestamp)
let sw = new StreamWriter(logPath,true)
//...
[<EntryPoint>]
let main argv = 
    try 
        sftpExample config.["SharedFolder"] config.["SFTPFolder"] 22 "usr" "pswd" |> ignore
    with 
    | ex -> 
        ex.Message |> Message |> printerAgent.Post
        printfn "%s" ex.Message // <- NOTICE THIS LINE
    printerAgent.PostAndReply( (fun replyChannel -> Shutdown replyChannel), 10000)  // Timeout = 10000 ms = 10 seconds
    sw.Close()
    sw.Dispose()
  

Ответ №2:

Самым простым решением было бы использовать обычную (синхронную) функцию для ведения журнала вместо MailboxProcessor или использовать какую-либо платформу ведения журнала и сбросить регистраторы в конце основной функции. Если вы хотите продолжать использовать printingAgent , вы можете реализовать «синхронный» режим следующим образом:

 type Msg =
    | Log of string
    | LogAndWait of string * AsyncReplyChannel<unit>

let printerAgent = MailboxProcessor.Start(fun inbox -> 
    let processLogMessage logMessage =
        sw.WriteLine("{0}: {1}", DateTime.UtcNow.ToShortTimeString(), logMessage)
        printfn "%s" logMessage
    let rec messageLoop() = async{        
        let! msg = inbox.Receive()
        match msg with 
        | Log logMessage ->
            processLogMessage logMessage
        | LogAndWait (logMessage, replyChannel) ->
            processLogMessage logMessage
            replyChannel.Reply()
        return! messageLoop()  
        }
    messageLoop() 
    )
  

Который вы затем использовали бы либо асинхронно

 printerAgent.Post(Log "Message")
  

или синхронно

 printerAgent.PostAndReply(fun channel -> LogAndWait("Message", channel))
  

Вам следует использовать синхронную альтернативу при регистрации исключения в функции main.