2 Stimmen

MailboxProcessor - Sagen, wann man aufhören soll?

Ich spiele im Moment mit dem MailboxProcessor . Deshalb habe ich mir ein paar Agenten ausgedacht, die ein Verzeichnis auf dem Computer und alle Unterverzeichnisse durchsuchen können - und dann die Dateien in jedem Verzeichnis ausgeben:

let fileCollector =
  MailboxProcessor.Start(fun self -> 
    let rec loop() =
      async { let! file = self.Receive()
              printfn "%s" file
              return! loop() }
    loop()) 

let folderCollector = 
  MailboxProcessor.Start(fun self -> 
    let rec loop() =
      async { let! dir = self.Receive()
              do! Async.StartChild(
                    async { let! files = Directory.AsyncGetFiles dir
                            for z in files do fileCollector.Post z }) |> Async.Ignore
              return! loop() }
    loop())

let crawler =
  MailboxProcessor.Start(fun self ->
    let rec loop() =
      async { let! dir = self.Receive()
              folderCollector.Post dir
              do! Async.StartChild(
                    async { let! dirs = Directory.AsyncGetDirectories dir
                            for z in dirs do self.Post z }) |> Async.Ignore
              return! loop() }
    loop())

crawler.Post @"C:\Projects"

printfn "Done" // Message getting fired right away, due to the async stuff.

Woran würde ich nun erkennen, dass die folderCollector , fileCollector y crawler durchgeführt werden, so dass die printfn Anweisung am Ende, würde aufgerufen werden, NACHDEM der Crawler erfolgreich alle Unterverzeichnisse durchforstet und alle Dateien ausgedruckt hat?

Aktualisierung: Durch die Anwendung der von Tomas Petricek in http://tomasp.net/blog/parallel-extra-image-pipeline.aspx habe ich mir folgenden Code ausgedacht:

let folders = new BlockingQueueAgent<string>(100)
let files = new BlockingQueueAgent<string>(100)

let rec folderCollector path =
  async { do! folders.AsyncAdd(path)
          do! Async.StartChild(
                  async { let! dirs = Directory.AsyncGetDirectories path
                          for z in dirs do
                            do! folderCollector z }) |> Async.Ignore }

let fileCollector =
  async { while true do
            let! dir = folders.AsyncGet()
            do! Async.StartChild(
                    async { let! fs = Directory.AsyncGetFiles dir
                            for z in fs do
                              do! files.AsyncAdd z }) |> Async.Ignore }

let rec printFiles() =
  async { let! file = files.AsyncTryGet(75)
          match file with
          | Some s -> 
            printfn "%s" s
            return! displayFiles()
          | None -> () }

let cts = new CancellationTokenSource()
Async.Start(folderCollector @"C:\Projects", cts.Token)
Async.Start(fileCollector, cts.Token)
Async.RunSynchronously(printFiles(), cancellationToken = cts.Token)

printfn "DONE!"

Aktualisierung: Update: In Ordnung, ich habe folgenden Code durcheinander gebracht:

let folders = new BlockingQueueAgent<string option>(10)
let files = new BlockingQueueAgent<string option>(10)

let folderCollector path =
  async { let rec loop path = 
            async { do! folders.AsyncAdd(Some path)
                    let! dirs = Directory.AsyncGetDirectories path
                    do! [ for z in dirs -> loop z ] |> Async.Parallel |> Async.Ignore } 
          do! loop path 
          do! folders.AsyncAdd(None) }

let rec fileCollector() =
  async { let! dir = folders.AsyncGet 125
          match dir with
          | Some s -> 
            let fs = Directory.GetFiles s
            do! [ for z in fs -> printfn "%s" z; files.AsyncAdd(Some z) ] |> Async.Parallel |> Async.Ignore // <-- Fails silence if files are full
            do! fileCollector() // <-- unreachable
          | None -> printfn "Done!"; ()}

Das sieht doch gut aus, oder? Aus irgendeinem Grund ist der do! fileCollector() Zeile in der fileCollector() Funktion, wird nicht ausgeführt wenn die files BlockingQueueAgent ist voll. Stattdessen scheitert es an der Stille.

Aber wenn ich es tue:

let folderCollector path =
  async { let rec loop path = 
            async { do! folders.AsyncAdd(Some path)
                    let! dirs = Directory.AsyncGetDirectories path
                    do! [ for z in dirs -> loop z ] |> Async.Parallel |> Async.Ignore } 
          do! loop path 
          do! folders.AsyncAdd(None) }

let rec fileCollector() =
  async { let! dir = folders.AsyncGet 75
          match dir with
          | Some s -> 
            let fs = Directory.GetFiles s
            do! Async.StartChild(async { do! [ for z in fs -> printfn "%s" z; files.AsyncAdd(Some z) ] 
                                             |> Async.Parallel |> Async.Ignore } ) |> Async.Ignore
            do! fileCollector()
          | None -> printfn "Done!"; ()}

Es funktioniert einwandfrei. Allerdings kann ich jetzt nicht mehr verfolgen, wann die fileCollector fertig ist, da es eine Reihe von asynchronen Berechnungen durchführt und daher selbst bei "None" in der Warteschlange noch etwas zu tun haben könnte. Was ist hier los?


Aktualisierung: Ich habe die fileCollector zum gleichen "Stil" wie folderCollector aber das Problem bleibt bestehen. Die geänderte Version:

let fileCollector() =
  async { let rec loop() = 
            async { let! dir = folders.AsyncGet 750
                    match dir with
                    | Some s -> 
                      let! fs = Directory.AsyncGetFiles s
                      do! [ for z in fs -> printfn "%A" z; files.AsyncAdd(Some z) ] 
                            |> Async.Parallel |> Async.Ignore 
                      return! loop()
                    | None -> printfn "Done!"; () }
          do! loop()
          printfn "after" // Never gets this far... 
          do! files.AsyncAdd(None) }

3voto

Tomas Petricek Punkte 233658

Um Ihre zweite Frage (aus dem Kommentar) bezüglich der aktualisierten Version auf der Grundlage von Pipelines zu beantworten - ich denke, Sie könnten Folgendes verwenden BlockingQueueAgent<option<string>> und verwenden Sie den Wert None wenn Sie alle Dateien erzeugt haben (die None Wert würde sich dann durch die Pipeline fortpflanzen und Sie könnten alle Arbeitsabläufe beenden, wenn sie den None ).

Zu diesem Zweck müssen Sie Folgendes ändern folderCollector um tatsächlich zu erkennen, wann die Iteration beendet ist. Es ist nicht getestet, aber das Folgende sollte funktionieren (der Punkt ist, dass Sie auf den Abschluss des rekursiven Aufrufs warten müssen):

let rec folderCollector path =
  let rec loop path = 
    async { do! folders.AsyncAdd(Some path)
            let! dirs = Directory.AsyncGetDirectories path
            do! [ for z in dirs do -> folderCollector z ] 
                |> Async.Parallel |> Async.Ignore }
  async { do! loop path
          do! folders.AsyncAdd(None) }

Alle Arbeitsabläufe würden potenziell None als Ergebnis von AsyncGet . Wenn dies geschieht, sollten sie die None an den nächsten Arbeiter in der Pipeline. Der letzte Worker kann sich beenden, wenn er None :

let rec printFiles() =
  async { let! file = files.AsyncGet(75) // Note - now we use just AsyncGet
          match file with
          | Some s -> 
            printfn "%s" s
            return! displayFiles()
          | None -> () } // Completed processing all files

2voto

Tomas Petricek Punkte 233658

Es gibt keine integrierte Unterstützung für die Benachrichtigung, wenn ein F#-Agent abgeschlossen ist. Es ist sogar ziemlich schwierig zu erkennen. Ein Agent, auch mit einer leeren Warteschlange, ist nicht abgeschlossen, da er immer noch Nachrichten von anderen Agenten empfangen und seine Arbeit wieder aufnehmen kann.

In Ihrem Beispiel ist die Arbeit getan, wenn die Warteschlangen aller drei Bearbeiter leer sind. Dies kann überprüft werden mit CurrentQueueLength . Das ist keine sehr schöne Lösung, aber sie wird funktionieren:

crawler.Post @"C:\Temp"
// Busy waiting until all queues are empty
while crawler.CurrentQueueLength <> 0 || folderCollector.CurrentQueueLength <> 0 ||
      fileCollector.CurrentQueueLength <> 0 do
    System.Threading.Thread.Sleep(10)
printfn "Done"

Ich denke, ein besserer Ansatz wäre, Ihren Code anders zu strukturieren - Sie brauchen nicht wirklich einen Agenten für die rekursive Verarbeitung eines Verzeichnisbaums zu verwenden. In Ihrer Version wird das Durchlaufen von Verzeichnissen ( crawler Agent) wird parallel zum Auffinden von Dateien in Ordnern durchgeführt ( folderCollector ) und die Verarbeitung der Ergebnisse ( fileCollector ), so dass Sie im Wesentlichen eine dreistufige Pipeline implementieren.

Sie können Pipelines einfacher implementieren, indem Sie nur async mit einer blockierenden Warteschlange, in der die unmittelbaren Ergebnisse der Verarbeitung gespeichert werden. Diese Artikel zeigt ein Beispiel mit Bildverarbeitung . Ich denke, der gleiche Ansatz würde auch bei Ihnen funktionieren. Die Erkennung des Endes einer Pipeline-Verarbeitung sollte einfacher sein (nach dem Senden aller Eingaben könnten Sie eine spezielle Nachricht senden, die den Abschluss anzeigt, und wenn die Nachricht am Ende der Pipeline ankommt, sind Sie fertig).

Eine andere Alternative wäre die Verwendung von asynchrone Abläufe , das ein gutes Muster für diese Art von Problem sein könnte (aber es gibt im Moment keine guten Beispiele im Internet).

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X