限制读取文件时的内存使用
我是 Haskell 初学者,认为这将是一个很好的练习。我有一个 我需要在线程 A 中读取文件的分配,处理文件行 线程 B_i 中,然后在线程 C 中输出结果。
到目前为止我已经实现了,但要求之一是我们 无法相信整个文件适合内存。我本来希望那么懒 IO 和垃圾收集器会为我做这件事,但可惜的是内存使用情况 不断上升。
读取器线程 (A) 使用 readFile
读取文件,然后将其压缩 带有行号并包裹在 Just 中。然后写入这些压缩行 到Control.Concurrent.Chan
。每个消费者线程B都有自己的通道。
每个消费者在有数据时读取自己的通道,并且如果正则表达式 匹配,它被输出到各自包装的输出通道 在 Maybe 内(由列表组成)。
打印机检查每个 B 线程的输出通道。如果没有一个 结果(行)为Nothing,该行被打印。既然此时 不应该引用旧的线路,我认为垃圾 收藏家将能够发布这些台词,但可惜我似乎在 这里错了。
所以问题是,如何限制内存使用,或者允许垃圾 收集器去除线路。
根据要求摘录。希望缩进不会被严重破坏:)
data Global = Global {done :: MVar Bool, consumers :: Consumers}
type Done = Bool
type Linenum = Int
type Line = (Linenum, Maybe String)
type Output = MVar [Line]
type Input = Chan Line
type Consumers = MVar (M.Map ThreadId (Done, (Input, Output)))
type State a = ReaderT Global IO a
producer :: [Input] -> FilePath -> State ()
producer c p = do
liftIO $ Main.log "Starting producer"
d <- asks done
f <- liftIO $ readFile p
mapM_ (\l -> mapM_
(liftIO . flip writeChan l) c)
$ zip [1..] $ map Just $ lines f
liftIO $ modifyMVar_ d (return . not)
printer :: State ()
printer = do
liftIO $ Main.log "Starting printer"
c <- (fmap (map (snd . snd) . M.elems)
(asks consumers >>= liftIO . readMVar))
uniq' c
where head' :: Output -> IO Line
head' ch = fmap head (readMVar ch)
tail' = mapM_ (liftIO . flip modifyMVar_
(return . tail))
cont ch = tail' ch >> uniq' ch
printMsg ch = readMVar (head ch) >>=
liftIO . putStrLn . fromJust . snd . head
cempty :: [Output] -> IO Bool
cempty ch = fmap (any id)
(mapM (fmap ((==) 0 . length) . readMVar ) ch)
{- Return false unless none are Nothing -}
uniq :: [Output] -> IO Bool
uniq ch = fmap (any id . map (isNothing . snd))
(mapM (liftIO . head') ch)
uniq' :: [Output] -> State ()
uniq' ch = do
d <- consumersDone
e <- liftIO $ cempty ch
if not e
then do
u <- liftIO $ uniq ch
if u then cont ch else do
liftIO $ printMsg ch
cont ch
else unless d $ uniq' ch
I'm a Haskell beginner and thought this would be good exercise. I have an
assignment where I need to read file in a thread A, handle the file lines
in threads B_i, and then output the results in thread C.
I have implemented this far already, but one of the requirements is that we
cannot trust that the entire file fits into memory. I was hoping that lazy
IO and garbage collector would do this for me, but alas the memory usage
keeps rising and rising.
The reader thread (A) reads the file with readFile
which is then zipped
with line numbers and wrapped in Just. These zipped lines are then written
to Control.Concurrent.Chan
. Each consumer thread B has its own channel.
Each consumer reads their own channel when it has data and if the regex
matches, it's outputted to their own respective output channel wrapped
within Maybe (made of lists).
The printer checks the output channel of each of the B threads. If none of
the results (line) is Nothing, the line is printed. Since at this point
there should be no reference to the older lines, I thought that the garbage
collector would be able to release these lines, but alas I seem to be in
the wrong here.
The .lhs file is in here:
http://gitorious.org/hajautettujen-sovellusten-muodostamistekniikat/hajautettujen-sovellusten-muodostamistekniikat/blobs/master/mgrep.lhs
So the question is, how do I limit the memory usage, or allow the garbage
collector to remove the lines.
Snippets as per requested. Hopefully indenting isn't too badly destroyed :)
data Global = Global {done :: MVar Bool, consumers :: Consumers}
type Done = Bool
type Linenum = Int
type Line = (Linenum, Maybe String)
type Output = MVar [Line]
type Input = Chan Line
type Consumers = MVar (M.Map ThreadId (Done, (Input, Output)))
type State a = ReaderT Global IO a
producer :: [Input] -> FilePath -> State ()
producer c p = do
liftIO $ Main.log "Starting producer"
d <- asks done
f <- liftIO $ readFile p
mapM_ (\l -> mapM_
(liftIO . flip writeChan l) c)
$ zip [1..] $ map Just $ lines f
liftIO $ modifyMVar_ d (return . not)
printer :: State ()
printer = do
liftIO $ Main.log "Starting printer"
c <- (fmap (map (snd . snd) . M.elems)
(asks consumers >>= liftIO . readMVar))
uniq' c
where head' :: Output -> IO Line
head' ch = fmap head (readMVar ch)
tail' = mapM_ (liftIO . flip modifyMVar_
(return . tail))
cont ch = tail' ch >> uniq' ch
printMsg ch = readMVar (head ch) >>=
liftIO . putStrLn . fromJust . snd . head
cempty :: [Output] -> IO Bool
cempty ch = fmap (any id)
(mapM (fmap ((==) 0 . length) . readMVar ) ch)
{- Return false unless none are Nothing -}
uniq :: [Output] -> IO Bool
uniq ch = fmap (any id . map (isNothing . snd))
(mapM (liftIO . head') ch)
uniq' :: [Output] -> State ()
uniq' ch = do
d <- consumersDone
e <- liftIO $ cempty ch
if not e
then do
u <- liftIO $ uniq ch
if u then cont ch else do
liftIO $ printMsg ch
cont ch
else unless d $ uniq' ch
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
并发编程不提供定义的执行顺序,除非您自己使用 mvar 等强制执行顺序。因此,在任何消费者读取并传递它们之前,生产者线程很可能会将所有/大部分行粘贴到 chan 中。另一种应该满足要求的架构是让线程 A 调用惰性读取文件并将结果保存在 mvar 中。然后,每个消费者线程获取 mvar,读取一行,然后在继续处理该行之前替换 mvar。即使如此,如果输出线程无法跟上,那么存储在 chan 上的匹配行数可以任意增加。
您拥有的是推送架构。要真正使其在恒定的空间中发挥作用,请从需求驱动的角度进行思考。找到一种机制,使输出线程向处理线程发出信号,告知它们应该做某事,并让处理线程向读取器线程发出信号,告知它们应该做某事。
另一种方法是使用有限大小的通道——这样,当处理器线程没有跟上时,读取器线程就会阻塞,而当输出线程没有跟上时,处理器线程就会阻塞。
总体而言,这个问题实际上让我想起了 Tim Bray 的广角取景器基准测试,尽管要求有些不同。无论如何,它引发了关于实现多核 grep 的最佳方式的广泛讨论。最大的亮点是问题是 IO 限制的,并且您希望在映射文件上有多个读取器线程。
请参阅此处了解您想知道的更多信息: http://www.tbray.org/ongoing/When/200x/2007/09/20/Wide-Finder
Concurrent programming offers no defined execution order unless you enforce one yourself with mvars and the like. So its likely that the producer thread sticks all/most of the lines in the chan before any consumer reads them off and passes them on. Another architecture that should fit the requirements is just have thread A call the lazy readfile and stick the result in an mvar. Then each consumer thread takes the mvar, reads a line, then replaces the mvar before proceeding to handle the line. Even then, if the output thread can't keep up, then the number of matching lines stored on the chan there can build up arbitrarily.
What you have is a push architecture. To really make it work in constant space, think in terms of demand driven. Find a mechanism such that the output thread signals to the processing threads that they should do something, and such that the processing threads signal to the reader thread that they should do something.
Another way to do this is to have chans of limited size instead -- so the reader thread blocks when the processor threads haven't caught up, and so the processor threads block when the output thread hasn't caught up.
As a whole, the problem in fact reminds me of Tim Bray's widefinder benchmark, although the requirements are somewhat different. In any case, it led to a widespread discussion on the best way to implement multicore grep. The big punchline was that the problem is IO bound, and you want multiple reader threads over mmapped files.
See here for more than you'll ever want to know: http://www.tbray.org/ongoing/When/200x/2007/09/20/Wide-Finder