[More progress on ch25 Bryan O'Sullivan **20080610173634] { addfile ./examples/ch25/ParMap.hs addfile ./examples/ch25/Strat.hs addfile ./examples/ch25/MapReduce.hs hunk ./en/ch25-concurrent.xml 1141 + + + + Parallel strategies and MapReduce + + Within the programming community, one of the most famous + software systems to credit functional programming for + inspiration is Google's MapReduce infrastructure for parallel + processing of bulk data. + + We can easily construct a greatly simplified, but still + useful, Haskell equivalent. To focus our attention, we will + look at the processing of web server log files, which tend to be + both huge and plentiful. + + While we could create a straightforward implementation + without much effort, we will resist the temptation to dive in. + If we think about solving a class of + problems instead of a single one, we may end up with more widely + applicable code. + + When we develop a parallel program, we are always faced with + a few bad penny problems, which turn up no matter + what the underlying programming language is. + + + + Our algorithm quickly becomes obscured by the details of + partitioning and communication. This makes it difficult to + understand code, which in turn makes modifying it + risky. + + + + Choosing a grain size&emdash;the smallest + unit of work parceled out to a core&emdash; can be + difficult. If the grain size is too small, cores spend so + much of their time on book-keeping that a parallel program + can easily become slower than a serial counterpart. If the + grain size is too large, some cores may lie idle due to poor + load balancing. + + + + + Separating algorithm from evaluation + + In parallel Haskell code, the clutter that would arise + from communication code in a traditional language is replaced + with the clutter of par and + pseq annotations. As an example, this + function operates similarly to map, but + evaluates each element to weak head normal form (WHNF) in + parallel as it goes. + + &ParMap.hs:parallelMap; + + The type b might be a list, + or some other type for which evaluation to WHNF doesn't do a + useful amount of work. We'd prefer not to have to write a + special parallelMap for lists, and for + every other type that needs special handling. + + To address this problem, we will begin by considering a + simpler problem: how to force a value to be evaluated. Here + is a function that forces every element of a list to be + evaluated. + + &ParMap.hs:forceList; + + Our function performs no computation on the list. (In + fact, from examining its type signature, we can tell that it + cannot perform any computation, since it + knows nothing about the elements of the list.) Its only + purpose is to ensure that the spine of the list is evaluated + to normal form. The only place that it makes any sense to + apply this function is in the first argument of + seq or par, for + example as follows. + + &ParMap.hs:stricterMap; + + This still leaves us with the elements of the list + evaluated only to WHNF. We address this by adding a function + as parameter that can force an element to be evaluated more + deeply. + + &ParMap.hs:forceListAndElts; + + The Control.Parallel.Strategies module turns + this idea into something we can use as a library. It + introduces the idea of an evaluation + strategy. + + &Strat.hs:Strategy; + + An evaluation strategy performs no computation; it simply + ensures that a value is evaluated to some extent. The + simplest strategy is named r0, and does + nothing at all. + + &Strat.hs:r0; + + Next is rwhnf, which evaluates a + value to weak head normal form. + + &Strat.hs:rwhnf; + + To evaluate a value to normal form, the module provides a + typeclass with a method named rnf. + + &Strat.hs:NFData; + + + Remembering those names + + If the names of these functions and types are not + sticking in your head, look at them as acronyms. The name + rwhnf expands to reduce to weak + head normal form; NFData becomes + normal form data; and so on. + + + For the basic types, such as Int, weak head + normal form and normal form are the same thing, which is why + the NFData typeclass uses + rwhnf as the default implementation of + rnf. For many common types, the + Control.Parallel.Strategies module provides + instances of NFData. + + &Strat.hs:instances; + + From these examples, it should be clear how you might + write an NFData instance for a type of your own. + Your implementation of rnf must handle + every constructor, and apply rnf to every + field of a constructor. + + + + Separating algorithm from strategy + + From these strategy building blocks, we can construct more + elaborate strategies. Many are already provided by + Control.Parallel.Strategies. For instance, + parList applies an evaluation strategy in + parallel to every element of a list. + + &Strat.hs:parList; + + The module uses this to define a parallel + map function. + + &Strat.hs:parMap; + + This is where the code becomes interesting. On the left + of using, we have a normal application of + map. On the right, we have an evaluation + strategy. The using combinator tells us + how to apply a strategy to a value, allowing us to keep the + code separate from how we plan to evaluate it. + + &Strat.hs:using; + + The Control.Parallel.Strategies module + provides many other functions that provide fine control over + evaluation. For instance, parZipWith + that applies zipWith in parallel, using + an evaluation strategy. + + &Strat.hs:vectorSum; + + + + Writing a simple MapReduce definition + + We can quickly suggest a type for a + mapReduce function by considering what it + must do. We need a map component, to + which we will give the usual type a -> b. And + we need a reduce; this term is a synonym + for fold. Rather than commit ourselves + to using a specific kind of fold, we'll use a more general + type, [b] -> c. This type lets us use a left + or right fold, so we can choose the one that suits our data + and processing needs. + + If we plug these types together, the complete type looks + like this. + + &MapReduce.hs:simpleMapReduce.type; + + The code that goes with the type is extremely + simple. + + &MapReduce.hs:simpleMapReduce; + + + + MapReduce and strategies + + Our definition of simpleMapReduce is + too simple to really be interesting. To make it useful, we + want to be able to specify that some of the work should occur + in parallel. We'll achieve this using strategies. Our type + becomes a little more complicated, as we need to be able to + control both the mapping strategy and the reduction + strategy. + + &MapReduce.hs:mapReduce.type; + + The body of the function has also grown, though it is + still brief and readable. + + &MapReduce.hs:mapReduce; + + hunk ./examples/ch25/MapReduce.hs 1 +import Data.Int (Int64) +import Control.Exception (bracket, finally) +import Control.Monad (forM, forM_, liftM) +import Control.Monad.Fix (fix) +import Control.Parallel (pseq) +import Control.Parallel.Strategies +import GHC.Conc (numCapabilities) +import qualified Data.ByteString.Char8 as SB +import qualified Data.ByteString.Lazy.Char8 as LB +import System.Environment (getArgs) +import System.IO +import Data.ByteString.Search.BoyerMoore (matchSL) + +data ChunkSpec = CS { + chunkOffset :: !Int64 + , chunkLength :: !Int64 + } + +lineChunks :: Int -> FilePath -> IO [ChunkSpec] +lineChunks numChunks path = do + bracket (openFile path ReadMode) hClose $ \h -> do + totalSize <- fromIntegral `liftM` hFileSize h + let chunkSize = totalSize `div` fromIntegral numChunks + flip fix 0 $ \findChunks offset -> do + let newOffset = offset + chunkSize + hSeek h AbsoluteSeek (fromIntegral newOffset) + flip fix newOffset $ \loop off -> do + eof <- hIsEOF h + if eof + then return [CS offset (totalSize - offset)] + else do + bytes <- LB.hGet h 4096 + case LB.elemIndex '\n' bytes of + Just n -> do + chunks@(c:_) <- findChunks (off + n + 1) + let coff = chunkOffset c + return (CS offset (coff - offset):chunks) + Nothing -> loop (off + LB.length bytes) + +chunkedRead :: (FilePath -> IO [ChunkSpec]) -> FilePath + -> IO ([LB.ByteString], [Handle]) +chunkedRead chunkfn path = do + cxs <- chunkfn path + chs <- forM cxs $ \spec -> do + h <- openFile path ReadMode + hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec)) + chunk <- LB.take (chunkLength spec) `liftM` LB.hGetContents h + return (chunk, h) + return (unzip chs) + +{-- snippet simpleMapReduce.type --} +simpleMapReduce + :: (a -> b) -- map function + -> ([b] -> c) -- reduce function + -> [a] -- list to map over + -> c +{-- /snippet simpleMapReduce.type --} + +{-- snippet simpleMapReduce --} +simpleMapReduce mapFunc reduceFunc = reduceFunc . map mapFunc +{-- /snippet simpleMapReduce --} + +{-- snippet mapReduce.type --} +mapReduce + :: Strategy b -- evaluation strategy for mapping + -> (a -> b) -- map function + -> Strategy c -- evaluation strategy for reduction + -> ([b] -> c) -- reduce function + -> [a] -- list to map over + -> c +{-- /snippet mapReduce.type --} + +{-- snippet mapReduce --} +mapReduce mapStrat mapFunc reduceStrat reduceFunc input = + mapResult `pseq` reduceResult + where mapResult = parMap mapStrat mapFunc input + reduceResult = reduceFunc mapResult `using` reduceStrat +{-- /snippet mapReduce --} + +withChunks :: (FilePath -> IO [ChunkSpec]) + -> ([LB.ByteString] -> a) + -> FilePath + -> IO a +withChunks chunkFunc process path = do + (chunks, handles) <- chunkedRead chunkFunc path + (return $! process chunks) `finally` mapM_ hClose handles + +isInfixOf :: String -> LB.ByteString -> Bool +isInfixOf needle = not . null . matchSL (SB.pack needle) + +main :: IO () +main = do + args <- getArgs + let files = if null args + then ["/home/bos/svnbook-access_log"] + else args + forM_ files $ \path -> do + r <- withChunks (lineChunks (numCapabilities * 4)) + (mapReduce rnf (length . filter (isInfixOf "09:00:00") . LB.lines) + rnf sum) + path + print r hunk ./examples/ch25/ParMap.hs 1 +{-- snippet parallelMap --} +import Control.Parallel (par) + +parallelMap :: (a -> b) -> [a] -> [b] +parallelMap f (x:xs) = let r = f x + in r `par` r : parallelMap f xs +{-- /snippet parallelMap --} + +{-- snippet forceList --} +forceList :: [a] -> () +forceList (x:xs) = x `seq` forceList xs +forceList _ = () +{-- /snippet forceList --} + +{-- snippet stricterMap --} +stricterMap :: (a -> b) -> [a] -> [b] +stricterMap f xs = forceList xs `seq` map f xs +{-- /snippet stricterMap --} + +{-- snippet forceListAndElts --} +forceListAndElts :: (a -> ()) -> [a] -> () +forceListAndElts forceElt (x:xs) = + forceElt x `seq` forceListAndElts forceElt xs +forceListAndElts _ _ = () +{-- /snippet forceListAndElts --} hunk ./examples/ch25/Strat.hs 1 +import Control.Parallel (par) +import Control.Parallel.Strategies (parZipWith) + +{-- snippet Strategy --} +type Done = () + +type Strategy a = a -> Done +{-- /snippet Strategy --} + +{-- snippet r0 --} +r0 :: Strategy a +r0 _ = () +{-- /snippet r0 --} + +{-- snippet rwhnf --} +rwhnf :: Strategy a +rwhnf x = x `seq` () +{-- /snippet rwhnf --} + +{-- snippet NFData --} +class NFData a where + rnf :: Strategy a + rnf = rwhnf +{-- /snippet NFData --} + +{-- snippet instances --} +instance NFData Char +instance NFData Int + +instance NFData a => NFData (Maybe a) where + rnf Nothing = () + rnf (Just x) = rnf x + +{- ... and so on ... -} +{-- /snippet instances --} + +{-- snippet seqList --} +seqList :: Strategy a -> Strategy [a] +seqList strat [] = () +seqList strat (x:xs) = strat x `seq` (seqList strat xs) +{-- /snippet seqList --} + +{-- snippet parList --} +parList :: Strategy a -> Strategy [a] +parList strat [] = () +parList strat (x:xs) = strat x `par` (parList strat xs) +{-- /snippet parList --} + +{-- snippet parMap --} +parMap :: Strategy b -> (a -> b) -> [a] -> [b] +parMap strat f xs = map f xs `using` parList strat +{-- /snippet parMap --} + +{-- snippet using --} +using :: a -> Strategy a -> a +using x s = s x `seq` x +{-- /snippet using --} + +{-- snippet vectorSum --} +vectorSum' :: (NFData a, Num a) => [a] -> [a] -> [a] +vectorSum' = parZipWith rnf (+) +{-- /snippet vectorSum --} }