-- Haskell98! -- Merging two sorted streams into a sorted stream -- An example of a two-headed enumeratee module Merge where -- The problem of merging two sorted streams into one -- sorted stream was posed by Permjacov Evgeniy in his Haskell-Cafe -- message on Dec 13, 2010. -- If the same element shows up in both streams, only one copy -- is sent to the output stream. -- One application is merging two files of dictionary words: -- a personal ispell dictionary with the main one. We use this -- application as one of our tests below. -- The problem well illustrates the reading of two streams at various paces. -- How much to read from a stream can be determined only dynamically, -- based on the data read from both streams. import IterateeM import Control.Monad.Trans import Control.Monad.Identity import System.Environment -- We demonstrate that reading several streams in parallel is -- easy and natural with Iteratees. That point was already -- made in IterateeN.hs -- We use the same approach here. We make the problem -- harder: we design not merely an iteratee with two -- input streams; we design an enumeratee. -- Thus we truly merge two input streams into a one, -- to be processed by inner iteratees as any other -- stream. -- We also process the two input streams by chunks rather than -- element-by-element. If a stream can yield several pieces of -- data at once, we take advantage of that, improving the performance. -- Here is the main function: two-input iteratee -- The function is very general, working in any base monads, with -- streams of any comparable elements. -- The signature could be inferred; it is not too complex to write down merge_streams :: (Monad m, Ord el) => Enumeratee el el (Iteratee el m) a merge_streams (IE_cont Nothing ik) = do let reader1 = chunk -- reader on one stream let reader2 = lift chunk -- and of the other do_merge ik (AChunk [] reader1) (AChunk [] reader2) merge_streams i = return i -- i doesn't want data -- Tests -- The first test: merging of two lists [3,6..51] and [5,10..51] -- into a sorted list. -- (The problem is a more interesting variation of the -- infamous FizzBuzz problem. We explicitly use no modular division or -- other cheating). -- For the sake of explanation, we build the merging function gradually, -- in several steps, to illustrate the peeling off the layers -- of streams and iteratees. -- The signatures can be (and have been) all inferred; we write them -- explicitly for clarity. -- Since merge_streams is an enumeratee, it needs an inner enumerator, -- the processor of the merged stream. We supply stream2list, which -- returns all elements read from a stream in a list. t1l1 :: (Monad m, Ord el) => Iteratee el (Iteratee el m) (Iteratee el (Iteratee el m) [el]) t1l1 = merge_streams stream2list -- The enumeratee merge_streams returns the final state of the inner -- enumeratee. We run the inner iteratee and obtain its result, [el] t1l2 :: (Monad m, Ord el) => Iteratee el (Iteratee el m) [el] t1l2 = runI =<< t1l1 -- The result is an iteratee, which can be passed to an enumerator, -- of the first stream t1l3 :: Monad m => Iteratee Int m (Iteratee Int (Iteratee Int m) [Int]) t1l3 = enum_pure_nchunk [3,6..51] 7 t1l2 -- The result of the enumerator is the final state of the iteratee. -- To extract the result, we run the iteratee. t1l4 :: (Monad m) => Iteratee Int m [Int] t1l4 = run =<< t1l3 -- The result is again an iteratee, which can be passed to an enumerator, -- of the outer stream t1l5 :: Monad m => m (Iteratee Int m [Int]) t1l5 = enum_pure_nchunk [5,10..51] 7 t1l4 -- We tell the final iteratee to yield the result t1l6 :: (Monad m) => m [Int] t1l6 = run =<< t1l5 -- which we can now see, by running the base monad t1l7 :: [Int] t1l7 = runIdentity t1l6 -- [3,5,6,9,10,12,15,18,20,21,24,25,27,30,33,35,36,39,40,42,45,48,50,51] -- The second test: merging two files of sorted lines (typically, ispell -- dictionaries) merge_dicts :: (MonadIO m) => FilePath -> FilePath -> m () merge_dicts file1 file2 = run =<< enum_file_gen file2 (runI =<< enum_words inner) where inner = run =<< enum_file_gen file1 (runI =<< enum_words (runI =<< merge_streams print_str)) -- like print_line but without the debugging output print_str :: MonadIO m => Iteratee String m () print_str = ie_cont step where step (Chunk []) = ie_contM step step (Chunk ls) = mapM_ pr_str ls >> ie_contM step step s@(EOF Nothing) = ie_doneM () s step s@(EOF e) = liftIO (putStrLn (">> unnatural end: " ++ show e)) >> ie_doneM () s pr_str str = liftIO $ putStrLn str main_merge_dicts = do [file1,file2] <- getArgs merge_dicts file1 file2 -- To compile this code -- ghc --make -O2 -main-is Merge.main_merge_dicts Merge.hs -- To run this code -- GHCRTS="-tstderr" /usr/bin/time ./Merge /usr/share/dict/words .ispell_english -- Check to see that merging a file with itself is the identity -- and that merging is symmetric -- Implementation -- A chunk of data with a function to read more data, -- the `underflow' function. -- If the underflow function returns [], there is no more data. data AChunk m a = AChunk [a] (m [a]) -- Fill the chunk if non-empty, invoking the -- underflow function to get the next available piece of data fill_chunk :: Monad m => AChunk m a -> m (AChunk m a) fill_chunk (AChunk [] reader) = do chunk <- reader return $ AChunk chunk reader fill_chunk c = return c -- Pure functional merge, removing duplicates. -- The function merges two argument lists returning the -- merged list and the remainder of the arguments. -- The empty list argument does not signify that the stream -- is exhausted. Rather, it indicates that a known part is over. mlist :: Ord a => [a] -> [a] -> ([a],([a],[a])) mlist [] l = ([],([],l)) mlist l [] = ([],(l,[])) mlist (x:xs) (y:ys) = case compare x y of LT -> add x (mlist xs (y:ys)) EQ -> add x (mlist xs ys) GT -> add y (mlist (x:xs) ys) where add e (l,rem) = (e:l,rem) -- Fill in the chunks, if needed, and go check the result do_merge :: (Monad m, Ord el) => (Stream el -> m (Iteratee el m a, Stream el)) -> AChunk (Iteratee el m) el -> AChunk (Iteratee el m) el -> Iteratee el m (Iteratee el m a) do_merge ik c1 c2 = do c1 <- fill_chunk c1 c2 <- fill_chunk c2 check c1 c2 where check (AChunk [] _) (AChunk [] _) = return $ ie_cont ik check (AChunk [] _) c2 = dump ik c2 check c1 (AChunk [] _) = dump ik c1 check (AChunk l1 reader1) (AChunk l2 reader2) = do let (res,(l1',l2')) = mlist l1 l2 feed (\ik -> do_merge ik (AChunk l1' reader1) (AChunk l2' reader2)) ik res -- Feed data to an iteratee and recur, if the iteratee is not finished feed loop ik buffer = lift (feedI ik (Chunk buffer)) >>= check where check (IE_cont Nothing ik) = loop ik check i = return i -- Send all of the stream represented by AChunk to an inner -- iteratee dump :: Monad m => (Stream el -> m (Iteratee el m a, Stream el)) -> AChunk (Iteratee el m) el -> Iteratee el m (Iteratee el m a) dump ik (AChunk buffer reader) = feed dumpit ik buffer where dumpit ik = do buffer <- reader if null buffer then return $ ie_cont ik else dump ik (AChunk buffer reader)