pipes-concurrency-2.0.12: Concurrency for the pipes ecosystem

Safe HaskellSafe
LanguageHaskell98

Pipes.Concurrent

Contents

Description

Asynchronous communication between pipes

Synopsis

Inputs and Outputs

newtype Input a #

An exhaustible source of values

recv returns Nothing if the source is exhausted

Constructors

Input 

Fields

Instances
Monad Input # 
Instance details

Defined in Pipes.Concurrent

Methods

(>>=) :: Input a -> (a -> Input b) -> Input b #

(>>) :: Input a -> Input b -> Input b #

return :: a -> Input a #

fail :: String -> Input a #

Functor Input # 
Instance details

Defined in Pipes.Concurrent

Methods

fmap :: (a -> b) -> Input a -> Input b #

(<$) :: a -> Input b -> Input a #

Applicative Input # 
Instance details

Defined in Pipes.Concurrent

Methods

pure :: a -> Input a #

(<*>) :: Input (a -> b) -> Input a -> Input b #

liftA2 :: (a -> b -> c) -> Input a -> Input b -> Input c #

(*>) :: Input a -> Input b -> Input b #

(<*) :: Input a -> Input b -> Input a #

Alternative Input # 
Instance details

Defined in Pipes.Concurrent

Methods

empty :: Input a #

(<|>) :: Input a -> Input a -> Input a #

some :: Input a -> Input [a] #

many :: Input a -> Input [a] #

MonadPlus Input # 
Instance details

Defined in Pipes.Concurrent

Methods

mzero :: Input a #

mplus :: Input a -> Input a -> Input a #

Semigroup (Input a) # 
Instance details

Defined in Pipes.Concurrent

Methods

(<>) :: Input a -> Input a -> Input a #

sconcat :: NonEmpty (Input a) -> Input a #

stimes :: Integral b => b -> Input a -> Input a #

Monoid (Input a) # 
Instance details

Defined in Pipes.Concurrent

Methods

mempty :: Input a #

mappend :: Input a -> Input a -> Input a #

mconcat :: [Input a] -> Input a #

newtype Output a #

An exhaustible sink of values

send returns False if the sink is exhausted

Constructors

Output 

Fields

Instances
Contravariant Output #

This instance is useful for creating new tagged address, similar to elm's Signal.forwardTo. In fact elm's forwardTo is just 'flip contramap'

Instance details

Defined in Pipes.Concurrent

Methods

contramap :: (a -> b) -> Output b -> Output a #

(>$) :: b -> Output b -> Output a #

Divisible Output # 
Instance details

Defined in Pipes.Concurrent

Methods

divide :: (a -> (b, c)) -> Output b -> Output c -> Output a #

conquer :: Output a #

Decidable Output # 
Instance details

Defined in Pipes.Concurrent

Methods

lose :: (a -> Void) -> Output a #

choose :: (a -> Either b c) -> Output b -> Output c -> Output a #

Semigroup (Output a) # 
Instance details

Defined in Pipes.Concurrent

Methods

(<>) :: Output a -> Output a -> Output a #

sconcat :: NonEmpty (Output a) -> Output a #

stimes :: Integral b => b -> Output a -> Output a #

Monoid (Output a) # 
Instance details

Defined in Pipes.Concurrent

Methods

mempty :: Output a #

mappend :: Output a -> Output a -> Output a #

mconcat :: [Output a] -> Output a #

Pipe utilities

fromInput :: MonadIO m => Input a -> Producer' a m () #

Convert an Input to a Producer

fromInput terminates when the Input is exhausted.

toOutput :: MonadIO m => Output a -> Consumer' a m () #

Convert an Output to a Consumer

toOutput terminates when the Output is exhausted.

Actors

spawn :: Buffer a -> IO (Output a, Input a) #

Spawn a mailbox using the specified Buffer to store messages

Using send on the Output

  • fails and returns False if the mailbox is sealed, otherwise it:
  • retries if the mailbox is full, or:
  • adds a message to the mailbox and returns True.

Using recv on the Input:

  • retrieves a message from the mailbox wrapped in Just if the mailbox is not empty, otherwise it:
  • retries if the mailbox is not sealed, or:
  • fails and returns Nothing.

If either the Input or Output is garbage collected the mailbox will become sealed.

spawn' :: Buffer a -> IO (Output a, Input a, STM ()) #

Like spawn, but also returns an action to manually seal the mailbox early:

(output, input, seal) <- spawn' buffer
...

Use the seal action to allow early cleanup of readers and writers to the mailbox without waiting for the next garbage collection cycle.

withSpawn :: Buffer a -> ((Output a, Input a) -> IO r) -> IO r #

withSpawn passes its enclosed action an Output and Input like you'd get from spawn, but automatically seals them after the action completes. This can be used when you need the sealing behavior available from 'spawn\'', but want to work at a bit higher level:

withSpawn buffer $ \(output, input) -> ...

withSpawn is exception-safe, since it uses bracket internally.

withBuffer :: Buffer a -> (Output a -> IO l) -> (Input a -> IO r) -> IO (l, r) #

A more restrictive alternative to withSpawn that prevents deadlocks

data Buffer a #

Buffer specifies how to buffer messages stored within the mailbox

Constructors

Unbounded

Deprecated: Use unbounded instead

Bounded Int

Deprecated: Use bounded instead

Single

Deprecated: Use bounded 1 instead

Latest a

Deprecated: Use latest instead

Newest Int

Deprecated: Use newest instead

New

Deprecated: Use newest 1 instead

unbounded :: Buffer a #

Store an unbounded number of messages in a FIFO queue

bounded :: Int -> Buffer a #

Store a bounded number of messages, specified by the Int argument

latest :: a -> Buffer a #

Only store the Latest message, beginning with an initial value

Latest is never empty nor full.

newest :: Int -> Buffer a #

Like Bounded, but send never fails (the buffer is never full). Instead, old elements are discarded to make room for new elements

Re-exports

Control.Concurrent re-exports forkIO, although I recommend using the async library instead.

Control.Concurrent.STM re-exports atomically and STM.

System.Mem re-exports performGC.

forkIO :: IO () -> IO ThreadId #

Creates a new thread to run the IO computation passed as the first argument, and returns the ThreadId of the newly created thread.

The new thread will be a lightweight, unbound thread. Foreign calls made by this thread are not guaranteed to be made by any particular OS thread; if you need foreign calls to be made by a particular OS thread, then use forkOS instead.

The new thread inherits the masked state of the parent (see mask).

The newly created thread has an exception handler that discards the exceptions BlockedIndefinitelyOnMVar, BlockedIndefinitelyOnSTM, and ThreadKilled, and passes all other exceptions to the uncaught exception handler.

readTVar :: TVar a -> STM a #

Return the current value stored in a TVar.

newTVarIO :: a -> IO (TVar a) #

IO version of newTVar. This is useful for creating top-level TVars using unsafePerformIO, because using atomically inside unsafePerformIO isn't possible.

atomically :: STM a -> IO a #

Perform a series of STM actions atomically.

Using atomically inside an unsafePerformIO or unsafeInterleaveIO subverts some of guarantees that STM provides. It makes it possible to run a transaction inside of another transaction, depending on when the thunk is evaluated. If a nested transaction is attempted, an exception is thrown by the runtime. It is possible to safely use atomically inside unsafePerformIO or unsafeInterleaveIO, but the typechecker does not rule out programs that may attempt nested transactions, meaning that the programmer must take special care to prevent these.

However, there are functions for creating transactional variables that can always be safely called in unsafePerformIO. See: newTVarIO, newTChanIO, newBroadcastTChanIO, newTQueueIO, newTBQueueIO, and newTMVarIO.

Using unsafePerformIO inside of atomically is also dangerous but for different reasons. See unsafeIOToSTM for more on this.

data STM a #

A monad supporting atomic memory transactions.

Instances
Monad STM

Since: base-4.3.0.0

Instance details

Defined in GHC.Conc.Sync

Methods

(>>=) :: STM a -> (a -> STM b) -> STM b #

(>>) :: STM a -> STM b -> STM b #

return :: a -> STM a #

fail :: String -> STM a #

Functor STM

Since: base-4.3.0.0

Instance details

Defined in GHC.Conc.Sync

Methods

fmap :: (a -> b) -> STM a -> STM b #

(<$) :: a -> STM b -> STM a #

Applicative STM

Since: base-4.8.0.0

Instance details

Defined in GHC.Conc.Sync

Methods

pure :: a -> STM a #

(<*>) :: STM (a -> b) -> STM a -> STM b #

liftA2 :: (a -> b -> c) -> STM a -> STM b -> STM c #

(*>) :: STM a -> STM b -> STM b #

(<*) :: STM a -> STM b -> STM a #

Alternative STM

Since: base-4.8.0.0

Instance details

Defined in GHC.Conc.Sync

Methods

empty :: STM a #

(<|>) :: STM a -> STM a -> STM a #

some :: STM a -> STM [a] #

many :: STM a -> STM [a] #

MonadPlus STM

Since: base-4.3.0.0

Instance details

Defined in GHC.Conc.Sync

Methods

mzero :: STM a #

mplus :: STM a -> STM a -> STM a #

mkWeakTVar :: TVar a -> IO () -> IO (Weak (TVar a)) #

Make a Weak pointer to a TVar, using the second argument as a finalizer to run when TVar is garbage-collected

Since: stm-2.4.3

performGC :: IO () #

Triggers an immediate major garbage collection.