pipes-group-1.0.8: Group streams into substreams

Pipes.Group.Tutorial

Description

pipes-group builds upon pipes to establish idioms for grouping streams into sub-streams without collecting elements into memory. This tutorial assumes familiarity with pipes and pipes-parse.

Synopsis

Motivation

Dividing a stream into sub-streams is non-trivial. To illustrate the problem, consider the following task: limit a stream to the first three groups of elements (a group means consecutive equal elements).

The wrong way to do it is to read each group into memory like this:

import Lens.Family.State.Strict (zoom)
import Pipes
import Pipes.Parse
import qualified Pipes.Prelude as P

threeGroups :: (Monad m, Eq a) => Producer a m () -> Producer a m ()
threeGroups p0 = loop 3 p0
where
loop 0 _ = return ()
loop n p = do
(as, p') <- lift $runStateT (zoom group drawAll) p each as loop (n - 1) p' The first problem is that this approach does not output any elements from each group until after parsing the entire group: >>> runEffect$ threeGroups P.stdinLn >-> P.stdoutLn
1<Enter>
1<Enter>
2<Enter>
1
1
2<Enter>
2<Enter>
3<Enter>
2
2
2
4<Enter>
3
>>> 


Worse, this program will crash without outputting a single value if fed an infinitely long group of identical elements:

>>> runEffect $threeGroups (each (repeat 1)) >-> P.print <Consumes all memory and crashes>  A better approach is to just stream directly from the first three groups instead of storing the groups in intermediate lists: import Lens.Family ((^.)) import Pipes import Pipes.Parse import qualified Pipes.Prelude as P threeGroups :: (Monad m, Eq a) => Producer a m () -> Producer a m () threeGroups p0 = loop 3 p0 where loop 0 _ = return () loop n p = do p' <- p ^. group loop (n - 1) p' This will run in constant memory and stream values immediately: >>> runEffect$ threeGroups P.stdinLn >-> P.stdoutLn
1<Enter>
1
1<Enter>
1
2<Enter>
2
2<Enter>
2
2<Enter>
2
3<Enter>
3
4<Enter>


However, this code is not very modular: we have to integrate our group creation logic with our group consumption logic. This conflicts with the pipes philosophy of decoupling streaming programs into modular components.

An more modular approach would be to split our logic into three steps:

• Split our Producer into groups
• Take the first three groups
• Join these three groups back into a Producer

But how do we split our Producer into groups without loading an entire group into memory? We want to avoid solutions like the following code:

import Control.Monad (when, liftM2)
import Lens.Family.State.Strict (zoom)
import Pipes.Parse

split :: (Monad m, Eq a) => Producer a m () -> Producer [a] m ()
split p = do
((as, eof), p') <- lift (runStateT parser p)
yield as
when (not eof) (split p')
where
parser = liftM2 (,) (zoom group drawAll) isEndOfInput

... because then we're back where we started, loading entire groups into memory.

FreeT

Fortunately, you can group elements while still streaming individual elements at a time. The FreeT type from the free package solves this problem by allowing us to build "linked lists" of Producers. This lets you work with streams in a list-like manner.

The key idea is that:

-- '~' means "is analogous to"

-- If a Producer is like a list
Producer a m ()            ~   [a]

-- ... then a 'FreeT'-delimited 'Producer' is like a list of lists
FreeT (Producer a m) m ()  ~  [[a]]

Think of (FreeT (Producer a m) m ()) as a "list of Producers". FreeT nests each subsequent Producer within the return value of the previous Producer so that you cannot access the next Producer until you completely drain the current Producer. However, you rarely need to work with FreeT directly. Instead, you can structure most things using "splitters", "transformations" and "joiners":

-- A "splitter"
Producer a m ()           -> FreeT (Producer a m) m ()  ~   [a]  -> [[a]]

-- A "transformation"
FreeT (Producer a m) m () -> FreeT (Producer a m) m ()  ~  [[a]] -> [[a]]

-- A "joiner"
FreeT (Producer a m) m () -> Producer a m ()            ~  [[a]] ->  [a]

An example splitter is (view groups), which splits a Producer into FreeT-delimited Producers, one for each group of consecutive equal elements:

view groups :: (Eq a, Monad m) => Producer a m x -> FreeT (Producer a m) m x

An example transformation is (takes 3), which takes the first three Producers from a FreeT and drops the rest:

takes 3 :: Monad m => FreeT (Producer a m) m () -> FreeT (Producer a m) m ()

An example joiner is concats, which collapses a FreeT of Producers back down into a single Producer:

concats :: Monad m => FreeT (Producer a m) m x -> Producer a m x

If you compose these three functions together, you will create a function that transforms a Producer to keep only the first three groups of consecutive equal elements:

import Lens.Family
import Pipes
import Pipes.Group
import qualified Pipes.Prelude as P

threeGroups :: (Monad m, Eq a) => Producer a m () -> Producer a m ()
threeGroups = concats . takes 3 . view groups

Both splitting and joining preserve the streaming nature of Producers and do not collect or buffer any values. The transformed Producer still outputs values immediately and does not wait for groups to complete before producing results.

>>> runEffect $threeGroups P.stdinLn >-> P.stdoutLn 1<Enter> 1 1<Enter> 1 2<Enter> 2 2<Enter> 2 2<Enter> 2 3<Enter> 3 4<Enter> >>>   Also, lenses simplify things even further. The reason that groups is a lens is because it actually combines both a splitter and joiner into a single package. We can then use over to handle both the splitting and joining for us: >>> runEffect$ over groups (takes 3) P.stdinLn >-> P.stdoutLn
<Exact same behavior>


This behaves the same because over takes care of calling the splitter before applying the transformation, then calling the inverse joiner afterward.

Another useful lens is individually, which lets you apply transformations to each Producer layer of a FreeT. For example, if we wanted to add an extra "!" line to the end of every group, we would write:

>>> import Control.Applicative ((<*))
>>> runEffect $over (groups . individually) (<* yield "!") P.stdinLn >-> P.stdoutLn 1<Enter> 1 1<Enter> 1 2<Enter> ! 2 2<Enter> 2 2<Enter> 2 3<Enter> ! 3 4<Enter> ! >>>   Note that individually is only compatible with the lens package. You can alternatively use maps if you are using lens-family-core: >>> runEffect$ over groups (maps (<* yield "!")) P.stdinLn >-> P.stdoutLn
<Exact same behavior>


How FreeT Works

You don't necessarily have to restrict yourself to predefined FreeT functions. You can also manually build or recurse over FreeTs of Producers.

For example, here is how concats is implemented, which collapses all the Producers within a FreeT into a single Producer:

concats :: Monad m => FreeT (Producer a m) m x -> Producer a m x
concats = go
where
go f = do
x <- lift (runFreeT f)  -- Match against the "head" of the "list"
case x of
Pure r -> return r  -- The "list" is empty
Free p -> do        -- The "list" is non-empty
f' <- p         -- The return value of the 'Producer' is
go f'           --     the "tail" of the "list"

Many patterns for FreeTs have equivalent analogs for lists. runFreeT behaves like pattern matching on the list, except that you have to bind the result. Pure is analogous to [] and Free is analogous to (:).

When you receive a Free constructor that means you have a Producer whose return value is the rest of the list (i.e. another FreeT). You cannot access the rest of the list without running the Producer to completion to retrieve this return value. The above example just runs the entire Producer, binds the remainder of the list to f' and then recurses on that value.

You can also build FreeTs in a manner similar to lists. For example, the chunksOf lens uses the following splitter function internally:

_chunksOf :: Monad m => Producer a m x -> FreeT (Producer a m) m x
_chunksOf p = FreeT $do x <- next p -- Pattern match on the 'Producer' return$ case x of
Left   r      -> Pure r     -- Build an empty "list"
Right (a, p') -> Free \$ do  -- Build a non-empty "list"
p'' <- (yield a >> p')^.splitAt n0  -- Emit the "head"
return (_chunksOf p'')              -- Return the "tail"

Pure signifies an empty FreeT (one with no Producer layers), just like [] signifies an empty list (one with no elements). We return Pure whenever we cannot emit any more Producers.

Free indicates that we wish to emit a Producer followed by another "list". The Producer we run directly within the body of the Free. However, we store the remainder of the "list" within the return value of the Producer. This is where _chunksOf recurses to build the rest of the "list".

To gain a better understanding for how FreeT works, consult the definition of the type, which you can find in Control.Monad.Trans.Free:

newtype FreeT f m a = FreeT { runFreeT :: m (FreeF f a (FreeT f m a)) }

data FreeF f a b = Pure a | Free (f b)

... and just replace all occurrences of f with (Producer e m):

-- This is pseudocode

newtype FreeT' m a = FreeT { runFreeT :: m (FreeF' a (FreeT' m a)) }

data FreeF' a b = Pure a | Free (Producer e m b)

... which you can further think of as:

-- More pseudocode

newtype FreeT' m a =
FreeT { runFreeT :: m (Pure a | Producer e m (FreeT' m a)) }

In other words, runFreeT unwraps a FreeT to produce an action in the base monad which either finishes with a value of type a or continues with a Producer which returns a new FreeT. Vice versa, if you want to build a FreeT, you must create an action in the base monad which returns either a Pure or a Producer wrapping another FreeT.

Conclusion

This library is very small since it only contains element-agnostic grouping utilities. Downstream libraries that provide richer grouping utilities include pipes-bytestring and pipes-text.

To learn more about pipes-group, ask questions, or follow development, you can subscribe to the haskell-pipes mailing list at: