为什么我使用 iteratee IO 的 Mapreduce 实现(现实世界的 haskell)也会失败,并出现“打开文件过多”的错误?

发布于 2024-11-04 08:30:58 字数 8272 浏览 6 评论 0原文

我正在实现一个 haskell 程序,它将文件的每一行与文件中的每一行进行比较。可以按如下方式实现单线程

distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)

sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
              fileContents <- readFile path
              return $ allDistances $ map read $ lines $ fileContents
              where
                  allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
                  allDistances _ = 0

这将在 O(n^2) 时间内运行,并且必须始终在内存中保留完整的整数列表。在我的实际程序中,该行包含更多数字,我从中构造了一个比 Int 稍微复杂的数据类型。这导致我必须处理的数据出现内存不足错误。

所以对上述单线程方案还有两点需要改进。首先,加快实际运行时间。其次,找到一种方法,不要将整个列表始终保留在内存中。我知道这需要解析整个文件 n 次。因此,将进行 O(n^2) 次比较,并解析 O(n^2) 行。这对我来说没问题,因为我宁愿有一个缓慢成功的程序,也不愿有一个失败的程序。当输入文件足够小时,我总是可以使用更简单的版本。

为了使用多个 cpu 核心,我从 Real World Haskell 中获取了 Mapreduce 实现(第 24 章,可用 此处)。

我修改了书中的分块函数,而不是将整个文件分成块,而是返回与行一样多的块,每个块代表一个元素

tails . lines . readFile

因为我希望程序在文件大小上也可扩展,所以我最初使用 <惰性 IO。然而,这因“打开文件太多”而失败,我在 上一个问题(GC 处理文件句柄太晚了)。完整的惰性 IO 版本发布在那里。

正如已接受的答案所解释的那样,严格 IO 可以解决该问题。这确实解决了 2k 行文件的“打开文件过多”问题,但在 50k 文件上却因“内存不足”而失败。

请注意,第一个单线程实现(没有mapreduce)能够处理50k 文件。

另一种解决方案也是对我最有吸引力的,它是使用iteratee IO。我希望这能够解决文件句柄和内存资源耗尽问题。然而,我的实现仍然失败,并在 2k 行文件上出现“打开文件过多”错误。

iteratee IO 版本具有与书中相同的mapReduce 函数,但修改了chunkedFileEnum 以使其与Enumerator 一起使用。

因此我的问题是;以下 iteratee IO 基础实现有什么问题?懒惰在哪里?

import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans (MonadIO, liftIO)
import System.IO

import qualified Data.Enumerator.List as EL
import qualified Data.Enumerator.Text as ET
import Data.Enumerator hiding (map, filter, head, sequence)

import Data.Text(Text)
import Data.Text.Read
import Data.Maybe

import qualified Data.ByteString.Char8 as Str
import Control.Exception (bracket,finally)
import Control.Monad(forM,liftM)
import Control.Parallel.Strategies
import Control.Parallel
import Control.DeepSeq (NFData)
import Data.Int (Int64)

--Goal: in a file with n values, calculate the sum of all n*(n-1)/2 squared distances

--My operation for one value pair
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)

combineDistances :: [Int] -> Int
combineDistances = sum

--Test file generation
createTestFile :: Int -> FilePath -> IO ()
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1
        where infiniteList :: Int->Int-> [Int]
              infiniteList i j = (i + j) : infiniteList j (i+j)

--Applying my operation simply on a file 
--(Actually does NOT throw an Out of memory on a file generated by createTestFile 50000)
--But I want to use multiple cores..
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
                  fileContents <- readFile path
                  return $ allDistances $ map read $ lines $ fileContents
                  where
                      allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x)    xs)
                      allDistances _ = 0

--Setting up an enumerator of read values from a text stream
readerEnumerator :: Monad m =>Integral a => Reader a -> Step a m b -> Iteratee Text m b
readerEnumerator reader = joinI . (EL.concatMapM transformer)
                            where transformer input = case reader input of
                                         Right (val, remainder) -> return [val]
                                         Left err -> return [0]

readEnumerator :: Monad m =>Integral a => Step a m b -> Iteratee Text m b
readEnumerator = readerEnumerator (signed decimal)

--The iteratee version of my operation
distancesFirstToTailIt :: Monad m=> Iteratee Int m Int
distancesFirstToTailIt = do
    maybeNum <- EL.head
    maybe (return 0) distancesOneToManyIt maybeNum

distancesOneToManyIt :: Monad m=> Int -> Iteratee Int m Int
distancesOneToManyIt base = do
    maybeNum <- EL.head
    maybe (return 0) combineNextDistance maybeNum
    where combineNextDistance nextNum = do
              rest <- distancesOneToManyIt base
              return $ combineDistances [(distance base nextNum),rest]

--The mapreduce algorithm
mapReduce :: Strategy b -- evaluation strategy for mapping
          -> (a -> b)   -- map function
          -> Strategy c -- evaluation strategy for reduction
          -> ([b] -> c) -- reduce function
          -> [a]        -- list to map over
          -> c
mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
          mapResult `pseq` reduceResult
          where mapResult    = parMap mapStrat mapFunc input
                reduceResult = reduceFunc mapResult `using` reduceStrat

--Applying the iteratee operation using mapreduce
sumOfDistancesOnFileWithIt :: FilePath -> IO Int
sumOfDistancesOnFileWithIt path = chunkedFileEnum chunkByLinesTails (distancesUsingMapReduceIt) path

distancesUsingMapReduceIt :: [Enumerator Text IO Int] -> IO Int
distancesUsingMapReduceIt = mapReduce rpar (runEnumeratorAsMapFunc)
                                      rpar (sumValuesAsReduceFunc)
                            where runEnumeratorAsMapFunc :: Enumerator Text IO Int -> IO Int
                                  runEnumeratorAsMapFunc = (\source->run_ (source $$ readEnumerator $$ distancesFirstToTailIt))
                                  sumValuesAsReduceFunc :: [IO Int] -> IO Int
                                  sumValuesAsReduceFunc = liftM sum . sequence
                              

--Working with (file)chunk enumerators:
data ChunkSpec = CS{
    chunkOffset :: !Int
    , chunkLength :: !Int
    } deriving (Eq,Show)

chunkedFileEnum ::   (NFData (a)) => MonadIO m =>
                (FilePath-> IO [ChunkSpec])
           ->   ([Enumerator Text m b]->IO a)
           ->   FilePath
           ->   IO a
chunkedFileEnum chunkCreator funcOnChunks path = do
    (chunks, handles)<- chunkedEnum chunkCreator path
    r <- funcOnChunks chunks
    (rdeepseq r `seq` (return r)) `finally` mapM_ hClose handles

chunkedEnum ::  MonadIO m=>
                (FilePath -> IO [ChunkSpec])
            ->  FilePath
            ->  IO ([Enumerator Text m b], [Handle])
chunkedEnum chunkCreator path = do
    chunks <- chunkCreator path
    liftM unzip . forM chunks $ \spec -> do
        h <- openFile path ReadMode
        hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
        let chunk = ET.enumHandle h --Note:chunklength not taken into account, so just to EOF
        return (chunk,h)

-- returns set of chunks representing  tails . lines . readFile 
chunkByLinesTails :: FilePath -> IO[ChunkSpec]
chunkByLinesTails path = do
    bracket (openFile path ReadMode) hClose $ \h-> do
        totalSize <- fromIntegral `liftM` hFileSize h
        let chunkSize = 1
            findChunks offset = do
            let newOffset = offset + chunkSize
            hSeek h AbsoluteSeek (fromIntegral newOffset)
            let findNewline lineSeekOffset = do
                eof <- hIsEOF h
                if eof
                    then return [CS offset (totalSize - offset)]
                    else do
                        bytes <- Str.hGet h 256
                        case Str.elemIndex '\n' bytes of
                            Just n -> do
                                nextChunks <- findChunks (lineSeekOffset + n + 1)
                                return (CS offset (totalSize-offset):nextChunks)
                            Nothing -> findNewline (lineSeekOffset + Str.length bytes)
            findNewline newOffset
        findChunks 0

顺便说一句,我正在跑步 Mac OS X 10.6.7(雪豹)上的 HaskellPlatform 2011.2.0
具有以下软件包:
字节串0.9.1.10
并行3.1.0.1
enumerator 0.4.8 ,带有手册此处

I am implementing a haskell program which compares each line of a file with each other line in the file. Which can be implemented single threaded as follows

distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)

sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
              fileContents <- readFile path
              return $ allDistances $ map read $ lines $ fileContents
              where
                  allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
                  allDistances _ = 0

This will run in O(n^2) time, and has to keep the complete list of integers in memory the whole time. In my actual program the line contains more numbers, out of which I construct a slightly more complex datatype than Int. This gave me out of memory errors on the data I have to process.

So there are two improvements to be made to the above-mentioned single threaded solution. First, speed up the actual running time. Second, find a way to not keep the whole list in memory the full time. I know this requires parsing the complete file n times. Thus there will be O(n^2) comparisons, and O(n^2) lines parsed. This is OK for me as I'd rather have a slow successful program than a failing program. When the input file is small enough I can always reside to a simpler version.

To use multiple cpu cores I took the Mapreduce implementation out of Real World Haskell (chapter 24, available here).

I modified the chunking function from the book to, instead of dividing the complete file in chunks, return as many chunks as lines with each chunk representing one element of

tails . lines . readFile

Because I want the program also to be scalable in file-size, I initially used lazy IO. This however fails with "Too many open files", about which I asked in a previous question (the file handles were disposed too late by the GC). The full lazy IO version is posted there.

As the accepted answer explains, strict IO could solve the issue. That indeed solves the "Too many open files" problem for 2k line files, but fails with "out of memory" on a 50k file.

Note that the first single threaded implementation (without mapreduce) is capable of handling a 50k file.

The alternative solution, which also appeals most to me, is to use iteratee IO. I expected this to solve both the file handle, and memory resource exhaustion. My implementation however still fails with a "Too many open files" error on a 2k line file.

The iteratee IO version has the same mapReduce function as in the book, but has a modified chunkedFileEnum to let it work with an Enumerator.

Thus my question is; what is wrong with the following iteratee IO base implementation? Where is the Laziness?.

import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans (MonadIO, liftIO)
import System.IO

import qualified Data.Enumerator.List as EL
import qualified Data.Enumerator.Text as ET
import Data.Enumerator hiding (map, filter, head, sequence)

import Data.Text(Text)
import Data.Text.Read
import Data.Maybe

import qualified Data.ByteString.Char8 as Str
import Control.Exception (bracket,finally)
import Control.Monad(forM,liftM)
import Control.Parallel.Strategies
import Control.Parallel
import Control.DeepSeq (NFData)
import Data.Int (Int64)

--Goal: in a file with n values, calculate the sum of all n*(n-1)/2 squared distances

--My operation for one value pair
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)

combineDistances :: [Int] -> Int
combineDistances = sum

--Test file generation
createTestFile :: Int -> FilePath -> IO ()
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1
        where infiniteList :: Int->Int-> [Int]
              infiniteList i j = (i + j) : infiniteList j (i+j)

--Applying my operation simply on a file 
--(Actually does NOT throw an Out of memory on a file generated by createTestFile 50000)
--But I want to use multiple cores..
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
                  fileContents <- readFile path
                  return $ allDistances $ map read $ lines $ fileContents
                  where
                      allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x)    xs)
                      allDistances _ = 0

--Setting up an enumerator of read values from a text stream
readerEnumerator :: Monad m =>Integral a => Reader a -> Step a m b -> Iteratee Text m b
readerEnumerator reader = joinI . (EL.concatMapM transformer)
                            where transformer input = case reader input of
                                         Right (val, remainder) -> return [val]
                                         Left err -> return [0]

readEnumerator :: Monad m =>Integral a => Step a m b -> Iteratee Text m b
readEnumerator = readerEnumerator (signed decimal)

--The iteratee version of my operation
distancesFirstToTailIt :: Monad m=> Iteratee Int m Int
distancesFirstToTailIt = do
    maybeNum <- EL.head
    maybe (return 0) distancesOneToManyIt maybeNum

distancesOneToManyIt :: Monad m=> Int -> Iteratee Int m Int
distancesOneToManyIt base = do
    maybeNum <- EL.head
    maybe (return 0) combineNextDistance maybeNum
    where combineNextDistance nextNum = do
              rest <- distancesOneToManyIt base
              return $ combineDistances [(distance base nextNum),rest]

--The mapreduce algorithm
mapReduce :: Strategy b -- evaluation strategy for mapping
          -> (a -> b)   -- map function
          -> Strategy c -- evaluation strategy for reduction
          -> ([b] -> c) -- reduce function
          -> [a]        -- list to map over
          -> c
mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
          mapResult `pseq` reduceResult
          where mapResult    = parMap mapStrat mapFunc input
                reduceResult = reduceFunc mapResult `using` reduceStrat

--Applying the iteratee operation using mapreduce
sumOfDistancesOnFileWithIt :: FilePath -> IO Int
sumOfDistancesOnFileWithIt path = chunkedFileEnum chunkByLinesTails (distancesUsingMapReduceIt) path

distancesUsingMapReduceIt :: [Enumerator Text IO Int] -> IO Int
distancesUsingMapReduceIt = mapReduce rpar (runEnumeratorAsMapFunc)
                                      rpar (sumValuesAsReduceFunc)
                            where runEnumeratorAsMapFunc :: Enumerator Text IO Int -> IO Int
                                  runEnumeratorAsMapFunc = (\source->run_ (source $ readEnumerator $ distancesFirstToTailIt))
                                  sumValuesAsReduceFunc :: [IO Int] -> IO Int
                                  sumValuesAsReduceFunc = liftM sum . sequence
                              

--Working with (file)chunk enumerators:
data ChunkSpec = CS{
    chunkOffset :: !Int
    , chunkLength :: !Int
    } deriving (Eq,Show)

chunkedFileEnum ::   (NFData (a)) => MonadIO m =>
                (FilePath-> IO [ChunkSpec])
           ->   ([Enumerator Text m b]->IO a)
           ->   FilePath
           ->   IO a
chunkedFileEnum chunkCreator funcOnChunks path = do
    (chunks, handles)<- chunkedEnum chunkCreator path
    r <- funcOnChunks chunks
    (rdeepseq r `seq` (return r)) `finally` mapM_ hClose handles

chunkedEnum ::  MonadIO m=>
                (FilePath -> IO [ChunkSpec])
            ->  FilePath
            ->  IO ([Enumerator Text m b], [Handle])
chunkedEnum chunkCreator path = do
    chunks <- chunkCreator path
    liftM unzip . forM chunks $ \spec -> do
        h <- openFile path ReadMode
        hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
        let chunk = ET.enumHandle h --Note:chunklength not taken into account, so just to EOF
        return (chunk,h)

-- returns set of chunks representing  tails . lines . readFile 
chunkByLinesTails :: FilePath -> IO[ChunkSpec]
chunkByLinesTails path = do
    bracket (openFile path ReadMode) hClose $ \h-> do
        totalSize <- fromIntegral `liftM` hFileSize h
        let chunkSize = 1
            findChunks offset = do
            let newOffset = offset + chunkSize
            hSeek h AbsoluteSeek (fromIntegral newOffset)
            let findNewline lineSeekOffset = do
                eof <- hIsEOF h
                if eof
                    then return [CS offset (totalSize - offset)]
                    else do
                        bytes <- Str.hGet h 256
                        case Str.elemIndex '\n' bytes of
                            Just n -> do
                                nextChunks <- findChunks (lineSeekOffset + n + 1)
                                return (CS offset (totalSize-offset):nextChunks)
                            Nothing -> findNewline (lineSeekOffset + Str.length bytes)
            findNewline newOffset
        findChunks 0

Btw, I'm running
HaskellPlatform 2011.2.0 on Mac OS X 10.6.7 (snow leopard)
with the following packages:
bytestring 0.9.1.10
parallel 3.1.0.1
enumerator 0.4.8 , with a manual here

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

你的心境我的脸 2024-11-11 08:30:58

正如错误所示,打开的文件太多。我预计 Haskell 会按顺序运行大部分程序,但有些“火花”会并行。然而,正如 sclv 提到的,Haskell 总是引发评估。

这在纯函数式程序中通常不是问题,但在处理 IO(资源)时却是问题。我将 Real World Haskell 书中描述的并行性扩展得太远了。所以我的结论是,在处理 Spark 内的 IO 资源时,只在有限的范围内进行并行。在纯函数部分,过度的并行可能会成功。

因此,我的帖子的答案是,不要在整个程序上使用 MapReduce,而是在内部纯功能部分中使用。

为了显示程序实际失败的位置,我使用 --enable-executable-profiling -p 配置它,构建它,并使用 +RTS -p -hc -L30 运行它。由于可执行文件立即失败,因此没有内存分配配置文件。 .prof 文件中生成的时间分配配置文件以以下内容开头:

                                                                                               individual    inherited
COST CENTRE              MODULE                                               no.    entries  %time %alloc   %time %alloc

MAIN                     MAIN                                                   1            0   0.0    0.3   100.0  100.0
  main                    Main                                                1648           2   0.0    0.0    50.0   98.9
    sumOfDistancesOnFileWithIt MapReduceTest                                  1649           1   0.0    0.0    50.0   98.9
      chunkedFileEnum       MapReduceTest                                     1650           1   0.0    0.0    50.0   98.9
        chunkedEnum          MapReduceTest                                    1651         495   0.0   24.2    50.0   98.9
          lineOffsets         MapReduceTest                                   1652           1  50.0   74.6    50.0   74.6

chunkedEnum 返回 IO ([Enumerator Text mb], [Handle]),并且显然接收 495 个条目。输入文件是一个 2k 行文件,因此 lineOffsets 上的单个条目返回 2000 个偏移量的列表。 distancesUsingMapReduceIt 中没有一个条目,所以实际工作根本就没有开始!

As the error says, there are too many open files. I expected Haskell to run most of the program sequentially, but some 'sparks' parallel. However, as sclv mentioned, Haskell always sparks the evaluations.

This usually is not a problem in a pure functional program, but it is when dealing with IO (resources). I scaled the parallelism as described in the Real World Haskell book too far up. So my conclusion is to do parallelism only on a limited scale when dealing with IO resources within the sparks. In the pure functional part, excessive parallelism may succeed.

Thus the answer to my post is, to not use MapReduce on the whole program, but within an inner pure functional part.

To show where the program actually failed, i configured it with --enable-executable-profiling -p, build it, and ran it using +RTS -p -hc -L30. Because the executable fails immediately, there is no memory allocation profile. The resulting time allocation profile in the .prof file starts with the following:

                                                                                               individual    inherited
COST CENTRE              MODULE                                               no.    entries  %time %alloc   %time %alloc

MAIN                     MAIN                                                   1            0   0.0    0.3   100.0  100.0
  main                    Main                                                1648           2   0.0    0.0    50.0   98.9
    sumOfDistancesOnFileWithIt MapReduceTest                                  1649           1   0.0    0.0    50.0   98.9
      chunkedFileEnum       MapReduceTest                                     1650           1   0.0    0.0    50.0   98.9
        chunkedEnum          MapReduceTest                                    1651         495   0.0   24.2    50.0   98.9
          lineOffsets         MapReduceTest                                   1652           1  50.0   74.6    50.0   74.6

chunkedEnum returns IO ([Enumerator Text m b], [Handle]), and apparently receives 495 entries. The input file was a 2k line file, so the single entry on lineOffsets returned a list of 2000 offsets. There is not a single entry in distancesUsingMapReduceIt, so the actual work did not even start!

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文