10 Stimmen

Warum schlägt meine Mapreduce-Implementierung (real world haskell) mit iteratee IO ebenfalls mit "Too many open files" fehl?

Ich implementiere ein Haskell-Programm, das jede Zeile einer Datei mit jeder anderen Zeile in der Datei vergleicht. Das kann implementiert werden mit einem Gewinde wie folgt

distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)

sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
              fileContents <- readFile path
              return $ allDistances $ map read $ lines $ fileContents
              where
                  allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
                  allDistances _ = 0

Dies läuft in O(n^2)-Zeit und muss die komplette Liste der ganzen Zahlen die ganze Zeit im Speicher halten. In meinem eigentlichen Programm enthält die Zeile mehr Zahlen, aus denen ich einen etwas komplexeren Datentyp als Int konstruiere. Dies führte bei den zu verarbeitenden Daten zu Fehlern, weil der Speicher voll war.

Es gibt also zwei Verbesserungen, die an der oben erwähnten Single-Thread-Lösung vorgenommen werden müssen. Erstens, die tatsächliche Laufzeit zu beschleunigen. Zweitens muss ein Weg gefunden werden, um nicht die ganze Liste die ganze Zeit im Speicher zu halten. Ich weiß, dass dazu die gesamte Datei n-mal geparst werden muss. Es werden also O(n^2) Vergleiche durchgeführt und O(n^2) Zeilen geparst. Das ist für mich in Ordnung, denn ich habe lieber ein langsames, erfolgreiches Programm als ein fehlerhaftes Programm. Wenn die Eingabedatei klein genug ist, kann ich jederzeit auf eine einfachere Version zurückgreifen.

Um mehrere Prozessorkerne zu nutzen, habe ich die Mapreduce-Implementierung aus Real World Haskell (Kapitel 24, verfügbar aquí ).

Ich habe die Chunking-Funktion aus dem Buch dahingehend geändert, dass sie, anstatt die gesamte Datei in Chunks zu unterteilen, so viele Chunks wie Zeilen zurückgibt, wobei jeder Chunk ein Element von

tails . lines . readFile

Da ich möchte, dass das Programm auch in der Dateigröße skalierbar ist, habe ich zunächst träges IO . Dies schlägt jedoch mit "Zu viele offene Dateien" fehl, worüber ich in einem vorherige Frage (die Datei-Handles wurden von der GC zu spät entsorgt). Die vollständige Lazy-IO-Version ist dort zu finden.

Wie die akzeptierte Antwort erklärt, strenges IO könnte das Problem lösen. Das löst zwar das Problem "Zu viele offene Dateien" für 2k-Zeilen-Dateien, schlägt aber bei einer 50k-Datei mit "out of memory" fehl.

Beachten Sie, dass die erste mit einem Gewinde Implementierung (ohne mapreduce) ist in der Lage, eine 50k-Datei zu verarbeiten.

Die alternative Lösung, die mir auch am meisten zusagt, ist die Verwendung von IO-Wiederholung . Ich hatte erwartet, dass dies sowohl das Problem des Dateihandles als auch der Erschöpfung der Speicherressourcen lösen würde. Meine Implementierung schlägt jedoch immer noch mit einem "Too many open files"-Fehler bei einer 2k-Zeilen-Datei fehl.

Die iterative IO-Version hat die gleiche mapReduce Funktion wie im Buch, hat aber eine geänderte chunkedFileEnum damit es mit einer Zähler .

Daher ist meine Frage; was ist falsch mit der folgenden Iteratee IO-Basis-Implementierung? Wo ist die Faulheit?

import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans (MonadIO, liftIO)
import System.IO

import qualified Data.Enumerator.List as EL
import qualified Data.Enumerator.Text as ET
import Data.Enumerator hiding (map, filter, head, sequence)

import Data.Text(Text)
import Data.Text.Read
import Data.Maybe

import qualified Data.ByteString.Char8 as Str
import Control.Exception (bracket,finally)
import Control.Monad(forM,liftM)
import Control.Parallel.Strategies
import Control.Parallel
import Control.DeepSeq (NFData)
import Data.Int (Int64)

--Goal: in a file with n values, calculate the sum of all n*(n-1)/2 squared distances

--My operation for one value pair
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)

combineDistances :: [Int] -> Int
combineDistances = sum

--Test file generation
createTestFile :: Int -> FilePath -> IO ()
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1
        where infiniteList :: Int->Int-> [Int]
              infiniteList i j = (i + j) : infiniteList j (i+j)

--Applying my operation simply on a file 
--(Actually does NOT throw an Out of memory on a file generated by createTestFile 50000)
--But I want to use multiple cores..
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
                  fileContents <- readFile path
                  return $ allDistances $ map read $ lines $ fileContents
                  where
                      allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x)    xs)
                      allDistances _ = 0

--Setting up an enumerator of read values from a text stream
readerEnumerator :: Monad m =>Integral a => Reader a -> Step a m b -> Iteratee Text m b
readerEnumerator reader = joinI . (EL.concatMapM transformer)
                            where transformer input = case reader input of
                                         Right (val, remainder) -> return [val]
                                         Left err -> return [0]

readEnumerator :: Monad m =>Integral a => Step a m b -> Iteratee Text m b
readEnumerator = readerEnumerator (signed decimal)

--The iteratee version of my operation
distancesFirstToTailIt :: Monad m=> Iteratee Int m Int
distancesFirstToTailIt = do
    maybeNum <- EL.head
    maybe (return 0) distancesOneToManyIt maybeNum

distancesOneToManyIt :: Monad m=> Int -> Iteratee Int m Int
distancesOneToManyIt base = do
    maybeNum <- EL.head
    maybe (return 0) combineNextDistance maybeNum
    where combineNextDistance nextNum = do
              rest <- distancesOneToManyIt base
              return $ combineDistances [(distance base nextNum),rest]

--The mapreduce algorithm
mapReduce :: Strategy b -- evaluation strategy for mapping
          -> (a -> b)   -- map function
          -> Strategy c -- evaluation strategy for reduction
          -> ([b] -> c) -- reduce function
          -> [a]        -- list to map over
          -> c
mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
          mapResult `pseq` reduceResult
          where mapResult    = parMap mapStrat mapFunc input
                reduceResult = reduceFunc mapResult `using` reduceStrat

--Applying the iteratee operation using mapreduce
sumOfDistancesOnFileWithIt :: FilePath -> IO Int
sumOfDistancesOnFileWithIt path = chunkedFileEnum chunkByLinesTails (distancesUsingMapReduceIt) path

distancesUsingMapReduceIt :: [Enumerator Text IO Int] -> IO Int
distancesUsingMapReduceIt = mapReduce rpar (runEnumeratorAsMapFunc)
                                      rpar (sumValuesAsReduceFunc)
                            where runEnumeratorAsMapFunc :: Enumerator Text IO Int -> IO Int
                                  runEnumeratorAsMapFunc = (\source->run_ (source $$ readEnumerator $$ distancesFirstToTailIt))
                                  sumValuesAsReduceFunc :: [IO Int] -> IO Int
                                  sumValuesAsReduceFunc = liftM sum . sequence

--Working with (file)chunk enumerators:
data ChunkSpec = CS{
    chunkOffset :: !Int
    , chunkLength :: !Int
    } deriving (Eq,Show)

chunkedFileEnum ::   (NFData (a)) => MonadIO m =>
                (FilePath-> IO [ChunkSpec])
           ->   ([Enumerator Text m b]->IO a)
           ->   FilePath
           ->   IO a
chunkedFileEnum chunkCreator funcOnChunks path = do
    (chunks, handles)<- chunkedEnum chunkCreator path
    r <- funcOnChunks chunks
    (rdeepseq r `seq` (return r)) `finally` mapM_ hClose handles

chunkedEnum ::  MonadIO m=>
                (FilePath -> IO [ChunkSpec])
            ->  FilePath
            ->  IO ([Enumerator Text m b], [Handle])
chunkedEnum chunkCreator path = do
    chunks <- chunkCreator path
    liftM unzip . forM chunks $ \spec -> do
        h <- openFile path ReadMode
        hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
        let chunk = ET.enumHandle h --Note:chunklength not taken into account, so just to EOF
        return (chunk,h)

-- returns set of chunks representing  tails . lines . readFile 
chunkByLinesTails :: FilePath -> IO[ChunkSpec]
chunkByLinesTails path = do
    bracket (openFile path ReadMode) hClose $ \h-> do
        totalSize <- fromIntegral `liftM` hFileSize h
        let chunkSize = 1
            findChunks offset = do
            let newOffset = offset + chunkSize
            hSeek h AbsoluteSeek (fromIntegral newOffset)
            let findNewline lineSeekOffset = do
                eof <- hIsEOF h
                if eof
                    then return [CS offset (totalSize - offset)]
                    else do
                        bytes <- Str.hGet h 256
                        case Str.elemIndex '\n' bytes of
                            Just n -> do
                                nextChunks <- findChunks (lineSeekOffset + n + 1)
                                return (CS offset (totalSize-offset):nextChunks)
                            Nothing -> findNewline (lineSeekOffset + Str.length bytes)
            findNewline newOffset
        findChunks 0

Übrigens, ich bin dabei HaskellPlatform 2011.2.0 auf Mac OS X 10.6.7 (Schneeleopard)
mit den folgenden Paketen:
bytestring 0.9.1.10
parallel 3.1.0.1
Enumerator 0.4.8 , mit einem Handbuch aquí

4voto

gerben Punkte 657

Wie die Fehlermeldung besagt, sind zu viele Dateien geöffnet. Ich habe erwartet, dass Haskell den größten Teil des Programms sequentiell ausführt, aber einige "Funken" parallel. Allerdings, wie sclv erwähnt, Haskell immer Funken die Auswertungen.

In einem rein funktionalen Programm ist dies normalerweise kein Problem, wohl aber beim Umgang mit IO (Ressourcen). Ich habe die Parallelität, wie im Buch Real World Haskell beschrieben, zu weit nach oben skaliert. Meine Schlussfolgerung ist also, Parallelität nur in begrenztem Umfang zu nutzen, wenn es um IO-Ressourcen innerhalb der Sparks geht. Im rein funktionalen Teil kann übermäßige Parallelität erfolgreich sein.

Die Antwort auf meinen Beitrag lautet also, MapReduce nicht für das gesamte Programm zu verwenden, sondern für einen inneren, rein funktionalen Teil.

Um zu zeigen, wo das Programm tatsächlich gescheitert ist, habe ich es mit --enable-executable-profiling -p konfiguriert, es gebaut und mit +RTS -p -hc -L30 ausgeführt. Da die ausführbare Datei sofort fehlschlägt, gibt es kein Speicherzuordnungsprofil. Das resultierende Zeitzuweisungsprofil in der .prof-Datei beginnt mit folgendem:

                                                                                               individual    inherited
COST CENTRE              MODULE                                               no.    entries  %time %alloc   %time %alloc

MAIN                     MAIN                                                   1            0   0.0    0.3   100.0  100.0
  main                    Main                                                1648           2   0.0    0.0    50.0   98.9
    sumOfDistancesOnFileWithIt MapReduceTest                                  1649           1   0.0    0.0    50.0   98.9
      chunkedFileEnum       MapReduceTest                                     1650           1   0.0    0.0    50.0   98.9
        chunkedEnum          MapReduceTest                                    1651         495   0.0   24.2    50.0   98.9
          lineOffsets         MapReduceTest                                   1652           1  50.0   74.6    50.0   74.6

chunkedEnum gibt IO ([Enumerator Text m b], [Handle]) zurück und erhält offenbar 495 Einträge. Die Eingabedatei war eine 2k-Zeilendatei, so dass der einzelne Eintrag bei lineOffsets eine Liste von 2000 Offsets zurückgab. Es gibt keinen einzigen Eintrag in distancesUsingMapReduceIt, die eigentliche Arbeit hat also gar nicht erst begonnen!

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X