[Chapter 25 done Bryan O'Sullivan **20080612215649] { addfile ./examples/ch25/CommonURLs.hs addfile ./examples/ch25/LineChunks.hs addfile ./examples/ch25/LineCount.hs addfile ./examples/ch25/Pearson.hs hunk ./en/bibliography.xml 4 + + Google08 + + JeffreyDean + SanjayGhemawat + <ulink + url="http://labs.google.com/papers/mapreduce.html">MapReduce: + simplified data processing on large clusters</ulink> + + + <ulink + url="http://cacm.acm.org/">Communications of the + ACM</ulink> + 51 + 1 + January 2008 + 107-113 + + Association for Computing + Machinery + + 0956-7968 + + + hunk ./en/ch10-find-dsl.xml 1003 - + hunk ./en/ch19-ffi.xml 3 - + hunk ./en/ch25-concurrent.xml 792 - two functions, par and + three functions, par, + pseq, and hunk ./en/ch25-concurrent.xml 804 - When we chain the uses of par - together as above, we are effectively saying evaluate - the first two expressions in parallel with the - third. + As for pseq, it is similar to + seq: it evaluates the expression on the + left to WHNF before returning the expression on the right. The + difference between the two is subtle, but important for + parallel programs: the compiler does not + promise to evaluate the left argument of + seq if it can see that evaluating the + right argument first would improve performance. This lax + guarantee is fine for a program executing on one core, but it + is not strong enough for code running on multiple cores. In + contrast, the compiler guarantees that + pseq will evaluate its left argument + before its right. hunk ./en/ch25-concurrent.xml 853 - &Sorting.hs:badSort; + &Sorting.hs:sillySort; hunk ./en/ch25-concurrent.xml 855 - We direct your attention to the small changes to each use - of par. Instead of force - lesser and force greater, in this - example we evaluate lesser and - greater. + Take a look at the small changes in each use of + par. Instead of force + lesser and force greater, here we + evaluate lesser and greater. hunk ./en/ch25-concurrent.xml 863 - sorted sublist to WHNF. Since the outermost - constructor in each case is just a single list constructor, we - are in fact only forcing the evaluation of the first element - of each sorted sublist! Every other element of each list - remains unevaluated. In other words, we do almost no useful - work in parallel: our uselessSort is - nearly completely sequential. + sorted sublist to WHNF. Since the outermost constructor in + each case is just a single list constructor, we are in fact + only forcing the evaluation of the first element of each + sorted sublist! Every other element of each list remains + unevaluated. In other words, we do almost no useful work in + parallel: our sillySort is nearly + completely sequential. hunk ./en/ch25-concurrent.xml 871 - We avoid this in our force function + We avoid this with our force function hunk ./en/ch25-concurrent.xml 880 - understanding of Haskell's evaluation model. + understanding of Haskell's evaluation model. And because we + will be using force on the left hand side + of par or pseq, we + don't need to return a meaningful value. hunk ./en/ch25-concurrent.xml 891 - Instead, it will do so if it makes sense. This - wishy-washy non-promise is actually more useful than a - guarantee to always evaluate an expression in parallel. It - gives the runtime system the freedom to do something - intelligent when it encounters a use of + Instead, it undertakes to do so if it makes + sense. This wishy-washy non-promise is actually + more useful than a guarantee to always evaluate an expression + in parallel. It gives the runtime system the freedom to act + intelligently when it encounters a use of hunk ./en/ch25-concurrent.xml 902 - there being more runnable threads than there are cores to - execute them. + there being more runnable threads than there are cores + available to execute them. hunk ./en/ch25-concurrent.xml 905 - This in turn affects how we write parallel code. Since - par is going to be somewhat intelligent - at runtime, we can use it almost wherever we like, in the - knowledge that the system will not become overloaded with + This lax guarantee in turn affects how we write parallel + code. Since par is going to be somewhat + intelligent at runtime, we can use it almost wherever we like, + on the assumption that performance will not be bogged down by hunk ./en/ch25-concurrent.xml 1042 - pseq instead of par. + pseq instead of + par. hunk ./en/ch25-concurrent.xml 1169 + As an example, here is a log entry for a page visit recorded + by the Apache web server. The entry originally filled one line; + we have split it across several lines to fit. + +201.49.94.87 - - [08/Jun/2008:07:04:20 -0500] "GET / HTTP/1.1" +200 2097 "http://en.wikipedia.org/wiki/Mercurial_(software)" +"Mozilla/5.0 (Windows; U; Windows XP 5.1; en-GB; rv:1.8.1.12) +Gecko/20080201 Firefox/2.0.0.12" 0 hgbook.red-bean.com + + hunk ./en/ch25-concurrent.xml 1369 - in parallel. We'll achieve this using strategies. Our type - becomes a little more complicated, as we need to be able to - control both the mapping strategy and the reduction - strategy. + in parallel. We'll achieve this using strategies, passing in + a strategy for the map phase and one for the reduction phase. hunk ./en/ch25-concurrent.xml 1374 - The body of the function has also grown, though it is - still brief and readable. + Both the type and the body of the function must grow a + little in size to accommodate the strategy parameters. hunk ./en/ch25-concurrent.xml 1380 - - + &LineChunks.hs:lineChunks; + + The last chunk will end up a little shorter than its + predecessors, but this difference will be insignificant in + practice. + + + + Counting lines + + This simple example illustrates how to use the scaffolding + we have built. + + &LineCount.hs:countLines; + + If we compile this program with , it should perform well after an initial + run to warm the filesystem cache. On a dual + core laptop, processing a log file 248 megabytes (1.1 million + lines) in size, this program runs in 0.576 seconds using a + single core, and 0.361 with two (using ). + + + + Finding the most popular URLs + + In this example, we count the number of times each URL is + accessed. This example comes from , Google's original paper discussing + MapReduce. In the map phase, for each + chunk, we create a Map from URL to the number of + times it was accessed. In the reduce + phase, we union-merge these maps into one. + + &CommonURLs.hs:countURLs; + + To pick a URL out of a line of the log file, we use the + bindings to the PCRE regular expression library that we + developed in . + + Our driver function prints the ten most popular URLs. As + with the line counting example, this program runs about 1.8 + times faster with two cores than with one, taking 1.7 seconds + to process the a log file containing 1.1 million + entries. + + + + Conclusions + + Given a problem that fits its model well, the MapReduce + programming model lets us write casual parallel + programs in Haskell with good performance, and minimal + additional effort. We can easily extend the idea to use other + data sources, such as collections of files, or data sourced + over the network. + + In many cases, the performance bottleneck will be + streaming data at a rate high enough to keep up with a core's + processing capacity. For instance, if we try to use either of + the above sample programs on a file that is not cached in + memory or streamed from a high-bandwidth storage array, we + will spend most of our time waiting for disk I/O, gaining no + benefit from multiple cores. + + hunk ./examples/ch25/CommonURLs.hs 1 +{-- snippet countURLs --} +module Main where + +import Control.Parallel.Strategies (NFData(..), rwhnf) +import Control.Monad (forM_) +import Data.List (foldl', sortBy) +import qualified Data.ByteString.Lazy.Char8 as L +import qualified Data.ByteString.Char8 as S +import qualified Data.Map as M +import Text.Regex.PCRE.Light (compile, match) + +import System.Environment (getArgs) +import LineChunks (chunkedReadWith) +import MapReduce (mapReduce) + +countURLs :: [L.ByteString] -> M.Map S.ByteString Int +countURLs = mapReduce rwhnf (foldl' augment M.empty . L.lines) + rwhnf (M.unionsWith (+)) + where augment map line = + case match (compile pattern []) (strict line) [] of + Just (_:url:_) -> M.insertWith' (+) url 1 map + _ -> map + strict = S.concat . L.toChunks + pattern = S.pack "\"(?:GET|POST|HEAD) ([^ ]+) HTTP/" +{-- /snippet countURLs --} + +{-- snippet main --} +instance NFData S.ByteString where + rnf _ = () -- not built into Control.Parallel.Strategies + +main = do + args <- getArgs + forM_ args $ \path -> do + m <- chunkedReadWith countURLs path + let mostPopular (_,a) (_,b) = compare b a + mapM_ print . take 10 . sortBy mostPopular . M.toList $ m +{-- /snippet main --} hunk ./examples/ch25/LineChunks.hs 1 +{-- snippet withChunks --} +module LineChunks + ( + chunkedReadWith + ) where + +import Control.Exception (bracket, finally) +import Control.Monad (forM, liftM) +import Control.Parallel.Strategies (NFData, rnf) +import Data.Int (Int64) +import qualified Data.ByteString.Lazy.Char8 as LB +import GHC.Conc (numCapabilities) +import System.IO + +data ChunkSpec = CS { + chunkOffset :: !Int64 + , chunkLength :: !Int64 + } deriving (Eq, Show) + +withChunks :: (NFData a) => + (FilePath -> IO [ChunkSpec]) + -> ([LB.ByteString] -> a) + -> FilePath + -> IO a +withChunks chunkFunc process path = do + (chunks, handles) <- chunkedRead chunkFunc path + let r = process chunks + (rnf r `seq` return r) `finally` mapM_ hClose handles + +chunkedReadWith :: (NFData a) => + ([LB.ByteString] -> a) -> FilePath -> IO a +chunkedReadWith func path = + withChunks (lineChunks (numCapabilities * 4)) func path +{-- /snippet withChunks --} + +{-- snippet chunkedRead --} +chunkedRead :: (FilePath -> IO [ChunkSpec]) + -> FilePath + -> IO ([LB.ByteString], [Handle]) +chunkedRead chunkFunc path = do + chunks <- chunkFunc path + liftM unzip . forM chunks $ \spec -> do + h <- openFile path ReadMode + hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec)) + chunk <- LB.take (chunkLength spec) `liftM` LB.hGetContents h + return (chunk, h) +{-- /snippet chunkedRead --} + +{-- snippet lineChunks --} +lineChunks :: Int -> FilePath -> IO [ChunkSpec] +lineChunks numChunks path = do + bracket (openFile path ReadMode) hClose $ \h -> do + totalSize <- fromIntegral `liftM` hFileSize h + let chunkSize = totalSize `div` fromIntegral numChunks + findChunks offset = do + let newOffset = offset + chunkSize + hSeek h AbsoluteSeek (fromIntegral newOffset) + let findNewline off = do + eof <- hIsEOF h + if eof + then return [CS offset (totalSize - offset)] + else do + bytes <- LB.hGet h 4096 + case LB.elemIndex '\n' bytes of + Just n -> do + chunks@(c:_) <- findChunks (off + n + 1) + let coff = chunkOffset c + return (CS offset (coff - offset):chunks) + Nothing -> findNewline (off + LB.length bytes) + findNewline newOffset + findChunks 0 +{-- /snippet lineChunks --} + +-- Ensure that a series of ChunkSpecs is contiguous and +-- non-overlapping. +prop_contig (CS o l:cs@(CS o' _:_)) | o + l == o' = prop_contig cs + | otherwise = False +prop_contig _ = True hunk ./examples/ch25/LineCount.hs 1 +{-- snippet countLines --} +module Main where + +import Control.Monad (forM_) +import Data.Int (Int64) +import qualified Data.ByteString.Lazy.Char8 as LB +import System.Environment (getArgs) + +import LineChunks (chunkedReadWith) +import MapReduce (mapReduce, rnf) + +lineCount :: [LB.ByteString] -> Int64 +lineCount = mapReduce rnf (LB.count '\n') + rnf sum + +main :: IO () +main = do + args <- getArgs + forM_ args $ \path -> do + numLines <- chunkedReadWith lineCount path + putStrLn $ path ++ ": " ++ show numLines +{-- /snippet countLines --} hunk ./examples/ch25/MapReduce.hs 1 -import Data.Int (Int64) -import Control.Exception (bracket, finally) -import Control.Monad (forM, forM_, liftM) -import Control.Monad.Fix (fix) +module MapReduce + ( + mapReduce + , simpleMapReduce + -- exported for convenience + , rnf + , rwhnf + ) where + hunk ./examples/ch25/MapReduce.hs 12 -import GHC.Conc (numCapabilities) -import qualified Data.ByteString.Char8 as SB -import qualified Data.ByteString.Lazy.Char8 as LB -import System.Environment (getArgs) -import System.IO -import Data.ByteString.Search.BoyerMoore (matchSL) - -data ChunkSpec = CS { - chunkOffset :: !Int64 - , chunkLength :: !Int64 - } - -lineChunks :: Int -> FilePath -> IO [ChunkSpec] -lineChunks numChunks path = do - bracket (openFile path ReadMode) hClose $ \h -> do - totalSize <- fromIntegral `liftM` hFileSize h - let chunkSize = totalSize `div` fromIntegral numChunks - flip fix 0 $ \findChunks offset -> do - let newOffset = offset + chunkSize - hSeek h AbsoluteSeek (fromIntegral newOffset) - flip fix newOffset $ \loop off -> do - eof <- hIsEOF h - if eof - then return [CS offset (totalSize - offset)] - else do - bytes <- LB.hGet h 4096 - case LB.elemIndex '\n' bytes of - Just n -> do - chunks@(c:_) <- findChunks (off + n + 1) - let coff = chunkOffset c - return (CS offset (coff - offset):chunks) - Nothing -> loop (off + LB.length bytes) - -chunkedRead :: (FilePath -> IO [ChunkSpec]) -> FilePath - -> IO ([LB.ByteString], [Handle]) -chunkedRead chunkfn path = do - cxs <- chunkfn path - chs <- forM cxs $ \spec -> do - h <- openFile path ReadMode - hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec)) - chunk <- LB.take (chunkLength spec) `liftM` LB.hGetContents h - return (chunk, h) - return (unzip chs) hunk ./examples/ch25/MapReduce.hs 42 -withChunks :: (FilePath -> IO [ChunkSpec]) - -> ([LB.ByteString] -> a) - -> FilePath - -> IO a -withChunks chunkFunc process path = do - (chunks, handles) <- chunkedRead chunkFunc path - (return $! process chunks) `finally` mapM_ hClose handles - -isInfixOf :: String -> LB.ByteString -> Bool -isInfixOf needle = not . null . matchSL (SB.pack needle) - -main :: IO () -main = do - args <- getArgs - let files = if null args - then ["/home/bos/svnbook-access_log"] - else args - forM_ files $ \path -> do - r <- withChunks (lineChunks (numCapabilities * 4)) - (mapReduce rnf (length . filter (isInfixOf "09:00:00") . LB.lines) - rnf sum) - path - print r - hunk ./examples/ch25/Pearson.hs 1 +{-# LANGUAGE BangPatterns, GeneralizedNewtypeDeriving #-} +{-# OPTIONS_GHC -funbox-strict-fields #-} + +module Pearson (main) where + +import qualified Data.ByteString.Char8 as B +import qualified Data.ByteString.Unsafe as U +import qualified Data.Map as M + +newtype User = U Int + deriving (Eq, Ord, Show) + +newtype Item = I Int + deriving (Eq, Ord, Show) + +newtype Rating = R Int + deriving (Eq, Ord, Show) + +data Record = Record { + recUser :: !User + , recItem :: !Item + , recRating :: !Rating + } deriving (Eq, Ord, Show) + +readInt s = case B.readInt s of + Just a -> a + _ -> error "eek" + +readRatings :: FilePath -> IO [Record] +readRatings path = (map parseRating . B.lines) `fmap` B.readFile path + +parseRating line = let (userId, a) = readInt line + (movieId, b) = readInt (U.unsafeDrop 2 a) + (rating, _) = readInt (U.unsafeDrop 2 b) + in Record (U userId) (I movieId) (R rating) + +(*) `on` f = \x y -> f x * f y + +ratingMapWith :: (Ord a) => (Record -> a) -> [Record] -> M.Map a [Record] +ratingMapWith f recs = M.fromAscListWith (++) [(f r, [r]) | r <- recs] + +main = do + r <- readRatings "/home/bos/downloads/million-ml-data/ratings.dat" + let m = ratingMapWith recUser r + print (M.size m) hunk ./examples/ch25/Sorting.hs 7 - hunk ./examples/ch25/Sorting.hs 14 -{-- snippet badSort --} -sillySort (x:xs) = lesser `par` greater `par` - (lesser ++ x:greater) +{-- snippet sillySort --} +sillySort (x:xs) = greater `par` (lesser `pseq` + (lesser ++ x:greater)) hunk ./examples/ch25/Sorting.hs 20 -{-- /snippet badSort --} +{-- /snippet sillySort --} hunk ./examples/ch25/Sorting.hs 24 - hunk ./examples/ch25/Sorting.hs 32 - }