[Begin chapter 25 Bryan O'Sullivan **20080516185652] { adddir ./examples/ch25 addfile ./examples/ch25/Compressor.hs addfile ./examples/ch25/MVarExample.hs addfile ./examples/ch25/ModifyMVar.hs addfile ./examples/ch25/NiceFork.hs addfile ./examples/ch25/WaitingCompressor.hs addfile ./examples/ch25/forkIO.ghci addfile ./examples/ch25/mvar.ghci hunk ./en/ch25-concurrent.xml 4 - Basics of Concurrent and Parallel Haskell + Concurrent and multicore programming hunk ./en/ch25-concurrent.xml 6 - FIXME - + As we write this book, the landscape of CPU architecture is + changing more rapidly than it has in decades. + + + Defining concurrency and parallelism + + A concurrent program needs to perform + several possibly unrelated tasks at the same time. Consider the + example of a virtual world: a server is typically composed of + dozens of components, each of which has complicated interactions + with the outside world. One component might handle multi-user + chat; another with evaluating scripts attached to objects; while + another processes monetary transactions. + + The correct operation of a concurrent program does not + require multiple cores, though they may improve performance and + responsiveness. + + In contrast, a parallel program solves + a single problem. Consider a financial model that attempts to + predict the next minute of fluctuations in the price of a single + stock. If we want to apply this model to every stock listed on + an exchange, to estimate which ones we should buy and sell, we + hope to get an answer more quickly if we run the model on five + hundred cores than if we use just one. (As this suggests, a + parallel program does not usually depend on the presence of + multiple cores to work correctly.) + + Another useful distinction between concurrent and parallel + programs lies in their interaction with the outside world. By + definition, a concurrent program deals continuously with + networking protocols, databases, and the like. A typical + parallel program is likely to be more focused: it streams data + in, crunches it for a while (with little further I/O), then + streams data back out. + + In this chapter, we will concern ourselves with concurrent + and parallel programs that operate within the boundaries of a + single operating system process. + + + + Concurrent programming with threads + + As a building block for concurrent programs, most + programming languages provide a way of creating multiple + independent threads of control. Haskell is + no exception, though programming with threads in Haskell looks + somewhat different than in other languages. + + In Haskell, a thread is an IO action that + executes independently from other threads. To create a thread, + we import the Control.Concurrent module and use the + forkIO function. + + &forkIO.ghci:forkIO; + + The new thread starts to execute almost immediately, and the + thread that created it continues to execute concurrently. + + + Threads are nondeterministic + + The runtime component of &GHC; does not specify an order + in which it executes threads. As a result, in our example + above, the file xyzzy created by the new + thread may or may not have been created + by the time the original thread checks for its existence. If + we try this example once, then remove + xyzzy and try again, we may get a + different result the second time. + + + + Hiding latency + + Suppose we have a large file to compress and write to + disk, but we want to handle a user's input quickly enough that + they will perceive our program as responding immediately. If + we use forkIO to write the file out in a + separate thread, we can do both simultaneously. + + &Compressor.hs:module; + + Because we're using lazy ByteString I/O here, + all we really do in the main thread is open the file. The + actual reading occurs on demand in the other thread. + + The use of flip catch print gives us a cheap + way to print an error message if the user enters the name of a + file that does not exist. + + + + + Communication between threads + + The simplest way to share information between two threads is + to let them both use a variable. In our file compression + example, the main thread shares both the + name of a file and its contents with the other thread. Because + Haskell data is immutable by default, this poses no risks: + neither thread can modify the other's view of the file's name or + contents. + + We often need to have threads actively communicate with each + other. For example, &GHC; does not provide a way for one thread + to find out whether another is still executing, has completed, + or has crashed + As we will show later, &GHC; threads are extraordinarily + lightweight. If the runtime were to provide a way to check + the status of every thread, the overhead of every thread + would increase, even if this information were never + used. + . However, it provides a synchronizing + variable type, the MVar, which we can + use for this purpose. + + An MVar acts like a single-element box: it can + be either full or empty. We can put something into the box, + making it full, or take something out, making it empty. + + &mvar.ghci:MVar; + + If we try to put a value into an MVar that is + already full, our thread is put to sleep until another thread + takes the value out. Similarly, if we try to take a value from + an empty MVar, our thread is put to sleep until + some other thread puts a value in. + + &MVarExample.hs:communicate; + + The newEmptyMVar function has a + descriptive name. To create an MVar that starts + out non-empty, we'd use newMVar. + + &mvar.ghci:new; hunk ./en/ch25-concurrent.xml 144 + Let's run our example in &ghci;. + + &mvar.ghci:communicate; + + + + The main thread and waiting for other threads + + &GHC;'s runtime system treats the program's original thread + of control differently from other threads. When this thread + finishes executing, the runtime system considers the program as + a whole to have completed. If any other threads are executing + at the time, they are terminated. + + As a result, when we have long-running threads that must not + be killed, we must make special arrangements to ensure that the + main thread doesn't complete until the others do. Let's develop + a small library that makes this easy to do. + + &NiceFork.hs:header; + + We keep our ThreadManager type abstract using + the usual recipe: we wrapping it in a &newtype; and retain + control over the means for creating a value of this type. Among + our module's exports, we list the type constructor and the + IO action that constructs a manager, but we do not + export the data constructor. + + &NiceFork.hs:module; + + For the implementation of ThreadManager, we + maintain a map from thread ID to thread state. We'll refer to + this as the thread map. + + &NiceFork.hs:ThreadManager; + + We have two levels of MVar use here. We keep + the Map in an MVar. This lets us + modify the map by replacing it with a new + version. We also ensure that any thread that uses the + Map will see a consistent view of it. + + For each thread that we manage, we maintain an + MVar. A per-thread MVar starts off + empty, which indicates that the thread is executing. When the + thread finishes or is killed by an uncaught exception, we put + this information into the MVar. + + To create a thread and watch its status, we must perform a + little bit of book-keeping. + + &NiceFork.hs:forkManaged; + + + Safely modifying an MVar + + The modifyMVar function that we use + in this example is very useful: it's a + safe combination of + takeMVar and + putMVar. + + &mvar.ghci:modifyMVar; + + It takes the value from an MVar, and passes + it to a function. This function can both modify the value and + return a result. If the function throws an exception, + modifyMVar puts the original value back + into the MVar, otherwise it puts the new value + in. It returns the other element of the function as its own + result. + + When we use modifyMVar instead of + manually managing an MVar with + takeMVar and + putMVar, we avoid two common kinds of + concurrency bug. + + + + Forgetting to put a value back into an + MVar. This can result in + deadlock, in which some thread waits + forever on an MVar that will never have a + value put into it. + + + Failure to account for the possibility that an + exception might be thrown, disrupting the flow of a piece + of code. This can result in a call to + putMVar that + should occur not actually happening, + again leading to deadlock. + + + + Because of these nice safety properties, it's wise to use + modifyMVar whenever possible. + + + + + Safe resource management: not just a good idea + + We can the take the pattern that + modifyMVar follows, and apply it to many + other resource management situations. + + + + Acquire a resource. + + + Pass the resource to a function that will do something + with it. + + + Always release the resource, even if an exception + occurs. If the function threw an exception, rethrow + it. + + + + Safety aside, this approach has another benefit: it can + make our code shorter and easier to follow. As we can see + from looking at forkManaged above, + Haskell's lightweight syntax for anonymous functions makes + this pattern visually unobtrusive. + + Here's the definition of modifyMVar, + so that you can see the general form of this pattern. + + &ModifyMVar.hs:modifyMVar; + + It should be easy to adapt this code to your specific + needs, whether you're working with network connections, + database handles, or data managed by a C library. + + hunk ./en/ch25-concurrent.xml 303 + hunk ./en/ch29-http-proxy.xml 3 - - Reverse HTTP Proxy Example + + Software transactional memory hunk ./en/ch29-http-proxy.xml 6 - FIXME + Concurrent software is notoriously difficult to write. In a single-thread hunk ./examples/ch07/JSONClass.hs 163 -instance Monad (Either JSONError) where - return = Right - Left err >>= _ = Left err - Right k >>= f = f k - hunk ./examples/ch25/Compressor.hs 1 +{-- snippet module --} +import Control.Concurrent (forkIO) +import Control.Monad (forever) +import qualified Data.ByteString.Lazy as L +import System.Console.Readline (readline) + +-- Provided by the 'zlib' package on http://hackage.haskell.org/ +import Codec.Compression.GZip (compress) + +main = loop + +loop = do + maybeLine <- readline "Enter a file to compress> " + case maybeLine of + Nothing -> return () -- user entered EOF + Just "" -> return () -- treat no name as "want to quit" + Just name -> do + flip catch print $ do + content <- L.readFile name + forkIO (compressFile name content) + return () + loop + where compressFile path = L.writeFile (path ++ ".gz") . compress +{-- /snippet module --} hunk ./examples/ch25/MVarExample.hs 1 +{-- snippet communicate --} +import Control.Concurrent + +communicate = do + m <- newEmptyMVar + forkIO $ do + v <- takeMVar m + putStrLn ("received " ++ show v) + putStrLn "sending" + putMVar m "wake up!" +{-- /snippet communicate --} hunk ./examples/ch25/ModifyMVar.hs 1 +{-- snippet modifyMVar --} +import Control.Concurrent (MVar, putMVar, takeMVar) +import Control.Exception (block, catch, throw, unblock) +import Prelude hiding (catch) -- use Control.Exception's version + +modifyMVar :: MVar a -> (a -> IO (a,b)) -> IO b +modifyMVar m io = + block $ do + a <- takeMVar m + (a',b) <- unblock (io a) `catch` \e -> + putMVar m a >> throw e + putMVar m a' + return b +{-- /snippet modifyMVar --} hunk ./examples/ch25/NiceFork.hs 1 +{-- snippet module --} +module NiceFork + ( + ThreadManager + , newManager + , forkManaged + , getStatus + , waitFor + , waitAll + ) where +{-- /snippet module --} + +{-- snippet header --} +import Control.Concurrent +import Control.Exception (Exception, try) +import qualified Data.Map as M + +data ThreadStatus = Running + | Finished -- terminated normally + | Threw Exception -- killed by uncaught exception + deriving (Eq, Show) + +-- | Create a new thread manager. +newManager :: IO ThreadManager + +-- | Create a new managed thread. +forkManaged :: ThreadManager -> IO () -> IO ThreadId + +-- | Immediately return the status of a managed thread. +getStatus :: ThreadManager -> ThreadId -> IO (Maybe ThreadStatus) + +-- | Block until a specific managed thread terminates. +waitFor :: ThreadManager -> ThreadId -> IO (Maybe ThreadStatus) + +-- | Block until all managed threads terminate. +waitAll :: ThreadManager -> IO () +{-- /snippet header --} + +{-- snippet ThreadManager --} +newtype ThreadManager = + Mgr (MVar (M.Map ThreadId (MVar ThreadStatus))) + deriving (Eq) + +newManager = Mgr `fmap` newMVar M.empty +{-- /snippet ThreadManager --} + +{-- snippet forkManaged --} +forkManaged (Mgr mgr) body = + modifyMVar mgr $ \m -> do + state <- newEmptyMVar + tid <- forkIO $ do + result <- try body + putMVar state (either Threw (const Finished) result) + return (M.insert tid state m, tid) +{-- /snippet forkManaged --} + +forkManaged_racy (Mgr mgr) body = do + state <- newEmptyMVar + tid <- forkIO $ + try body >>= putMVar state . either Threw (const Finished) + modifyMVar_ mgr (return . M.insert tid state) -- here's the race + return tid + +{-- snippet getStatus --} +getStatus (Mgr mgr) tid = + modifyMVar mgr $ \m -> + case M.lookup tid m of + Nothing -> return (m, Nothing) + Just st -> tryTakeMVar st >>= \mst -> case mst of + Nothing -> return (m, Just Running) + Just sth -> return (M.delete tid m, Just sth) +{-- /snippet getStatus --} + +{-- snippet waitFor_bad --} +waitFor_bad (Mgr mgr) tid = + modifyMVar mgr $ \m -> + case M.updateLookupWithKey (\_ _ -> Nothing) tid m of + (Nothing, _) -> return (m, Nothing) + (Just done, m') -> takeMVar done >>= \st -> return (m', Just st) +{-- /snippet waitFor_bad --} + +{-- snippet waitFor --} +waitFor (Mgr mgr) tid = do + maybeDone <- modifyMVar mgr $ \m -> + return $ case M.updateLookupWithKey (\_ _ -> Nothing) tid m of + (Nothing, _) -> (m, Nothing) + (Just done, m') -> (m', Just done) + case maybeDone of + Nothing -> return Nothing + Just st -> Just `fmap` takeMVar st +{-- /snippet waitFor --} + +{-- snippet waitAll --} +waitAll (Mgr mgr) = do + m <- takeMVar mgr + putMVar mgr M.empty + mapM_ takeMVar (M.elems m) +{-- /snippet waitAll --} hunk ./examples/ch25/WaitingCompressor.hs 1 +{-- snippet module --} +module Main (main) where + +import Control.Concurrent (forkIO) +import Control.Monad (forever) +import qualified Data.ByteString.Lazy as L +import System.Console.Readline (readline) + +-- Provided by the 'zlib' package on http://hackage.haskell.org/ +import Codec.Compression.GZip (compress) + +compressFile path = L.writeFile (path ++ ".gz") . compress + +main = loop + where + loop = do + maybeLine <- readline "Enter a file to compress> " + case maybeLine of + Nothing -> return () -- user entered EOF + Just "" -> return () -- treat no name as "want to quit" + Just name -> do + flip catch print $ do + content <- L.readFile name + forkIO (compressFile name content) + return () + loop +{-- /snippet module --} hunk ./examples/ch25/forkIO.ghci 1 +:m +System.Directory +catch (removeFile "xyzzy") (const $ return ()) + +--# forkIO +:m +Control.Concurrent +:t forkIO +:m +System.Directory +forkIO (writeFile "xyzzy" "seo craic nua!") >> doesFileExist "xyzzy" + +--# threadDelay +:t threadDelay hunk ./examples/ch25/mvar.ghci 1 +:m +Control.Concurrent + +--# MVar +:t putMVar +:t takeMVar + +--# new +:t newEmptyMVar +:t newMVar + +--# communicate +:load MVarExample +communicate + +--# modifyMVar +:t modifyMVar hunk ./examples/ch28/WebApp.hs 2 - FlexibleInstances #-} + FlexibleContexts, FlexibleInstances, OverlappingInstances #-} hunk ./examples/ch28/WebApp.hs 20 +import Control.Monad.Error hunk ./examples/ch28/WebApp.hs 35 -ok :: (JSON a, Monad m) => a -> m HttpResponse -ok = return . RespSuccess ["Content-Type: application/json"] . toJValue +ok :: (JSON a, Monad m) => a -> m JValue +ok = return . toJValue hunk ./examples/ch28/WebApp.hs 38 -httpError :: (JSON a, Monad m) => HttpError -> a -> m HttpResponse -httpError kind = - return . RespError kind ["Content-Type: text/plain"] . toJValue +httpError :: (JSON a, MonadError HttpResponse m) => HttpError -> a -> m b +httpError err = throwError . RespError err [] . toJValue hunk ./examples/ch28/WebApp.hs 41 -type Handler s = HttpRequest -> App s HttpResponse +type Handler s a = HttpRequest -> App s a hunk ./examples/ch28/WebApp.hs 45 - deriving (Eq, Ord, Show) + | InternalServerError + deriving (Show) + +instance Error HttpResponse where + strMsg = RespError InternalServerError [] . JString hunk ./examples/ch28/WebApp.hs 60 - } deriving (Eq, Ord, Show) + } deriving (Show) hunk ./examples/ch28/WebApp.hs 66 +respStatus (RespError InternalServerError _ _) = "500 Internal Server Error" hunk ./examples/ch28/WebApp.hs 68 +newtype App s a = App (ErrorT HttpResponse (ReaderT HttpConnection (StateT s IO)) a) + deriving (Functor, Monad, MonadIO, MonadError HttpResponse, + MonadReader HttpConnection, MonadState s) hunk ./examples/ch28/WebApp.hs 72 -newtype App s a = App (ReaderT HttpConnection (StateT s IO) a) - deriving (Functor, Monad, MonadIO, MonadReader HttpConnection, - MonadState s) - -runApp :: s -> HttpConnection -> App s a -> IO (a, s) -runApp st req (App a) = runStateT (runReaderT a req) st +runApp :: s -> HttpConnection -> App s a -> IO (Either HttpResponse a, s) +runApp st req (App a) = runStateT (runReaderT (runErrorT a) req) st hunk ./examples/ch28/WebApp.hs 92 -url :: (Method -> Bool) -> URLParser (Handler s) -> HttpRequest - -> Maybe (Handler s) +url :: (Method -> Bool) -> URLParser (Handler s JValue) -> HttpRequest + -> Maybe (Handler s JValue) hunk ./examples/ch28/WebApp.hs 98 -dispatch :: [HttpRequest -> Maybe (Handler s)] -> HttpRequest - -> App s HttpResponse +dispatch :: [HttpRequest -> Maybe (Handler s JValue)] -> HttpRequest + -> App s JValue hunk ./examples/ch29/Comment.hs 103 -handlers :: [HttpRequest -> Maybe (Handler AppState)] +handlers :: [HttpRequest -> Maybe (Handler AppState JValue)] hunk ./examples/ch29/Comment.hs 113 -chCount :: String -> HttpRequest -> H HttpResponse +chCount :: String -> HttpRequest -> H JValue hunk ./examples/ch29/Comment.hs 124 -cmtSingle :: ElementID -> HttpRequest -> H HttpResponse +cmtSingle :: ElementID -> HttpRequest -> H JValue hunk ./examples/ch29/Comment.hs 134 -cmtSubmit :: ElementID -> HttpRequest -> H HttpResponse +cmtSubmit :: ElementID -> HttpRequest -> H JValue }