5 Stimmen

Parallelism auf dem Divide & Conquer Algorithmus

Ich habe Probleme, meinen Code parallel auszuführen. Es handelt sich um einen 3D-Delaunay-Generator, der einen Divide-und-Conquer-Algorithmus namens DeWall verwendet.

Die Hauptfunktion ist:

deWall::[SimplexPointer] -> SetSimplexFace -> Box -> StateT DeWallSets IO ([Simplex], [Edge])
deWall p afl box = do
   ...
   ...
   get >>= recursion box1 box2 p1 p2 sigma edges
   ...
   ...

Sie ruft die "recursion"-Funktion auf, die die Dewall-Funktion wieder aufrufen könnte. Hier bietet sich die Möglichkeit zur Parallelisierung. Der folgende Code zeigt die sequentielle Lösung.

recursion::Box -> Box -> [SimplexPointer] -> [SimplexPointer] -> [Simplex] -> [Edge] -> DeWallSets -> StateT DeWallSets IO ([Simplex], [Edge])    
recursion box1 box2 p1 p2 sigma edges deWallSet
        | null afl1 && null afl2 = return (sigma, edges)
        | (null) afl1 = do
            (s, e) <- deWall p2 afl2 box2
            return (s ++ sigma, e ++ edges)
        | (null) afl2 = do
            (s,e) <- deWall p1 afl1 box1
            return (s ++ sigma, e ++ edges)
        | otherwise   = do
            x <- get
            liftIO $ do
                (s1, e1) <- evalStateT (deWall p1 afl1 box1) x
                (s2, e2) <- evalStateT (deWall p2 afl2 box2) x
                return (s1 ++ s2 ++ sigma, e1 ++ e2 ++ edges)

        where   afl1 = aflBox1 deWallSet
                afl2 = aflBox2 deWallSet

State und IO Monads werden verwendet, um den Zustand zu übermitteln und eine UID für jeden Tetraeder zu generieren, der MVar's verwendet. Mein erster Versuch war es, ein forkIO hinzuzufügen, aber es funktioniert nicht. Es gibt einen falschen Output aufgrund eines Mangels an Kontrolle während des Zusammenführungsteils, der nicht auf das Beenden beider Threads wartet. Ich weiß nicht, wie ich machen kann, dass es auf sie wartet.

            liftIO $ do
                let 
                    s1 = evalStateT (deWall p1 afl1 box1) x
                    s2 = evalStateT (deWall p2 afl2 box2) x
                    concatThread var (a1, b1) = takeMVar var >>= \(a2, b2) -> putMVar var (a1 ++ a2, b1 ++ b2)
                mv <- newMVar ([],[])
                forkIO (s1 >>= concatThread mv)
                forkIO (s2 >>= concatThread mv)
                takeMVar mv >>= \(s, e) -> return (s ++ sigma, e ++ edges)

Also war mein nächster Versuch, eine bessere parallele Strategie "par" und "pseq" zu verwenden, die das richtige Ergebnis liefert, aber laut threadScope keine parallele Ausführung bietet.

        liftIO $ do
            let
                s1 = evalStateT (deWall p1 afl1 box1) x
                s2 = evalStateT (deWall p2 afl2 box2) x
                conc = liftM2 (\(a1, b1) (a2, b2) -> (a1 ++ a2, b1 ++ b2))
            (stotal, etotal) = s1 `par` (s2 `pseq` (s1 `conc` s2))
            return (stotal ++ sigma, etotal ++ edges)

Was mache ich falsch?

UPDATE: Irgendwie scheint dieses Problem mit der Präsenz von IO Monaden zusammenhängen. In einer anderen (alten) Version ohne IO-Monade, nur mit der State-Monade, wird die parallele Ausführung mit 'par' und 'pseq' durchgeführt. Der GHC -sstderr gibt SPARKS: 1160 (69 converted, 1069 pruned) aus.

recursion::Box -> Box -> [SimplexPointer] -> [SimplexPointer] -> [Simplex] -> [Edge] -> DeWallSets -> State DeWallSets ([Simplex], [Edge])  
recursion p1 p2 sigma deWallSet
     | null afl1 && null afl2 = return sigma
     | (null) afl1 = do
         s <- deWall p2 afl2 box2
         return (s ++ sigma)
     | (null) afl2 = do
         s <- deWall p1 afl1 box1
         return (s ++ sigma)
     | otherwise   = do
                     x <- get
                     let s1 = evalState (deWall p1 afl1 box1) x
                     let s2 = evalState (deWall p2 afl2 box2) x
                     return $ s1 `par` (s2 `pseq` (s1 ++ s2 ++ sigma))
     where   afl1 = aflBox1 deWallSet
             afl2 = aflBox2 deWallSet

Kann mir jemand das erklären?

3voto

nominolo Punkte 5025

Die Verwendung von par und pseq sollte auf dem "Ausführungspfad" erfolgen, d. h. nicht innerhalb eines lokalen let. Versuchen Sie es damit (ändern Sie Ihren letzten Abschnitt)

let s1 = ...
    s2 = ...
    conc = ...
case s1 `par` (s2 `pseq` (s1 `conc` s2)) of
  (stotal, etotal) ->
     return (stotal ++ sigma, etotal ++ edges)

Ein case zwingt zur Auswertung seines Arguments bis zu einem schwachen Normalform (WHNF), bevor er in einem seiner Zweige fortsetzt. WHNF bedeutet, dass das Argument ausgewertet wird, bis der äußerste Konstruktor sichtbar ist. Felder können noch unevaluiert sein.

Um die vollständige Auswertung eines Arguments zu erzwingen, verwenden Sie deepseq. Seien Sie jedoch vorsichtig damit, weil deepseq manchmal Dinge langsamer machen kann, indem es zu viel Arbeit erledigt.

Ein leichtgewichtigerer Ansatz zur Hinzufügung von Strenge besteht darin, Felder streng zu machen:

data Foo = Foo !Int String

Jetzt, wenn ein Wert vom Typ Foo bis WHNF ausgewertet wird, wird auch sein erster Argument ausgewertet (aber nicht das zweite).

0 Stimmen

Du solltest ein {-# LANGUAGE BangPatterns #-} Pragma hinzufügen, bevor du ! verwendest, um Felder streng zu machen, vorausgesetzt du verwendest GHC.

2 Stimmen

@drvitek: Nein, BangPatterns werden nur für strenge Musterübereinstimmungen benötigt, nicht für Strenghinweisungen bei Datentypen.

0 Stimmen

Danke Jungs für die Kommentare. Ich habe versucht, Strenge in meinen Code einzufügen, aber ohne Erfolg (der GHC -sstderr gibt SPARKS: 1080 (0 konvertiert, 0 gestutzt)). Es scheint mit der Anwesenheit des IO-Monaden zusammenzuhängen. Siehe das Update in meiner Frage.

2voto

Axman6 Punkte 788

Der einfachste Weg, dies zum Laufen zu bringen, wäre etwas Ähnliches wie:

liftIO $ do
            let 
                s1 = evalStateT (deWall p1 afl1 box1) x
                s2 = evalStateT (deWall p2 afl2 box2) x
            mv1 <- newMVar ([],[])
            mv2 <- newMVar ([],[])
            forkIO (s1 >>= putMVar mv1)
            forkIO (s2 >>= putMVar mv2)
            (a1,b1) <- takeMVar mv1
            (a2,b2) <- takeMVar mv2
            return (a1++a2++sigma, b1++b2++edges)

Dies funktioniert, aber es gibt etwas unnötigen Overhead. Eine bessere Lösung ist:

liftIO $ do
            let 
                s1 = evalStateT (deWall p1 afl1 box1) x
                s2 = evalStateT (deWall p2 afl2 box2) x
            mv <- newMVar ([],[])
            forkIO (s2 >>= putMVar mv2)
            (a1,b1) <- s1
            (a2,b2) <- takeMVar mv2
             return (a1++a2++sigma, b1++b2++edges)

Oder möglicherweise dies, wenn die Ergebnisse nicht dort ausgewertet werden, wo Sie es gerne hätten:

liftIO $ do
        let 
            s1 = evalStateT (deWall p1 afl1 box1) x
            s2 = evalStateT (deWall p2 afl2 box2) x
        mv <- newMVar ([],[])
        forkIO (s2 >>= evaluate >>= putMVar mv2)
        (a1,b1) <- s1
        (a2,b2) <- takeMVar mv2
         return (a1++a2++sigma, b1++b2++edges)

(Das sind Antworten, die ich dem Poster in #haskell gegeben habe und die ich dachte, hier auch nützlich wären)

Bearbeitet: unnötiges evaluate entfernt.

0 Stimmen

Das hat mein Problem gelöst. Ich habe nur eine kleine Korrektur vorgenommen, indem ich mv2 <- newEmptyMVar anstelle von mv <- newMVar ([],[]) verwendet habe. Vielen Dank, Axman6.

1voto

sclv Punkte 38177

Wenn Sie bei expliziten Threads bleiben möchten, anstatt pseq, wie Sie bemerkt haben, benötigen Sie eine Möglichkeit, auf das Ende der Worker-Threads zu warten. Das ist ein großartiger Anwendungsfall für ein Mengen-Semaphor. Teilen Sie nach der Aufteilung der zu erledigenden Arbeit die Arbeit auf, damit jeder Worker-Thread beim Beenden den Semaphor signalisiert, wie viel Arbeit er erledigt hat.

Dann warten Sie darauf, dass alle Arbeitseinheiten abgeschlossen sind.

http://www.haskell.org/ghc/docs/6.8.3/html/libraries/base/Control-Concurrent-QSemN.html

Bearbeitung: etwas Pseudocode, um das Konzept zu erklären

do
 let workchunks :: [(WorkChunk, Size)]
     workchunks = dividework work

  let totalsize = sum $ map snd workchunks

 sem <- newQSem 0

 let forkworkThread (workchunk, size) = do
        executeWorkChunk workchunk
        signalQSem size

 mapM_ forkWorkThread workchunks
 waitQSem totalsize

 -- jetzt ist all Ihre Arbeit erledigt.

0 Stimmen

Leider habe ich nicht herausgefunden, wie man die QSenN-Semaphoren verwendet. Könnten Sie eine Referenz empfehlen?

0 Stimmen

Das klassische Papier zu diesem Thema ist "Concurrent Haskell" citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.47.7494 - es beschreibt jedoch die Implementierung von QSems, anstatt wie man sie verwendet. Andererseits sollte ihre Verwendung irgendwie unkompliziert sein.

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X