For this library to work, one would need to implement proper flow dispatcher, but, hopefully, one already exists: simple-flow-dispatcher.
Download it into directory quicklisp would be able
to find it and its dependencies
(cl-muth
) in. Now, let’s load this dispatcher and
cl-flow
itself.
(ql:quickload '(:simple-flow-dispatcher :cl-flow))
To assist with our journey, let’s also import cl-flow
symbols and define a utility method and
variables to run our flow:
(use-package :cl-flow)
(defvar *output* *standard-output*)
(defun print-error (e)
(format *output* "~A" e))
(defvar *dispatcher* (simple-flow-dispatcher:make-simple-dispatcher
:threads 4
:error-handler #'print-error))
(defun run-flow (flow)
(run *dispatcher* flow))
(run-flow (flow:atomically :default ()
(print "Hello, flowing World!" *output*)))
Looks like a lot is happenning here and, guess what, it is indeed! We just printed a string in
one of the four threads of the *dispatcher*
asynchronously! To confirm, just add a #'sleep
call
and you will see that #'run-flow
returns immediately and string is actually printed a bit later.
(run-flow (flow:atomically :default ()
(sleep 1)
(print "Hello, flowing World!" *output*)))
Let’s enhance this example with more feaures cl-flow
provides for a quick overview of the
syntax.
(run-flow (flow:serially
(flow:atomically :default ()
"Hello, flowing World!")
(flow:atomically :default (argument)
(print argument *output*))))
(flow:atomically :default ())
form we are going to call a flow block. :default
value here denotes an
invariant. Blocks with same (by #'EQ
) invariants are never executed concurrently (in parallel)
for simple-flow-dispatcher
. Second form in flow:atomically
block (()
or (argument)
here) declares an
argument that could be passed to the flow block from a previos block. And starting from third
form and henceforth is the body of the block that actually does a work we need.
flow:serially
lists flow blocks that must be executed serially - one after another.
Here, in this example, we specified two atomic blocks to be executed serially with passing a result of the first block into the second one and printing this result.
But dispatching serially is no fun. We can do that w/o this crappy overloaded syntax. Let’s do some multithreading!
(run-flow (flow:serially
(flow:concurrently
(flow:atomically :p ()
"Hello")
(flow:atomically :q ()
"parallel")
(flow:atomically :s ()
"World!"))
(flow:atomically :default (result)
(destructuring-bind (a b c) result
(format *output* "~A, ~A ~A" a b c)))))
Wows! You didn’t notice, but we just run three blocks of code in parallel, but also gathered the
results in predictable order! Not unlike how semaphores work only w/o any blocking. By the way,
flow:concurrently
operator list blocks, that are going to be run in parallel.
Basically, the code above means we scheduled a concurrent block (flow:concurrently
) to run serially with next
atomic block ((flow:atomically :default ... )
). In concurrent block, we specified 3 atomic blocks to run in
parallel. Results of those blocks are then gathered into a list and passed to the next block
after flow:concurrently
one - see result
argument.
To make sure we indeed ran those blocks in parallel, let’s change example a bit:
(run-flow (flow:serially
(flow:concurrently
(flow:atomically :p ()
(sleep 1)
(print "Processing hello" *output*)
"Hello")
(flow:atomically :q ()
(sleep 1/2)
(print "Processing parallel" *output*)
"parallel")
(flow:atomically :s ()
(sleep 1/4)
(print "Processing world" *output*)
"World!"))
(flow:atomically :default (result)
(destructuring-bind (a b c) result
(format *output* "~%~A, ~A ~A" a b c)))))
No way! We did indeed. And the order of results is preserved. How awesome is that? Yeah, total crap if one would need to write this heavy syntax trees all the time, I agree. Let’s make it a bit more maintainable.
First, let’s extract flows into a couple of functions:
(defun string-constructing-flow ()
(flow:concurrently
(flow:atomically :p ()
"Hello")
(flow:atomically :q ()
"parallel")
(flow:atomically :s ()
"World!")))
(defun printing-flow ()
(flow:atomically :default ((a b c))
(format *output* "~%~A, ~A ~A" a b c)))
And let’s run those:
(run-flow (flow:serially
(string-constructing-flow)
(printing-flow)))
Woohoo, it worked!
You might have noticed, that flow block in #'printing-flow
looks a bit different. We specified
((a b c))
as arguments instead of just (result)
. Yes! If you wish, you can put destructuring
lambda list as a block argument and cl-flow
will destructure incoming value for you.
For this special DSL for building computation trees I personally prefer shorthand syntax
available in cl-flow
. ->
is a synonym for flow:atomically
, flow:serially
can be replaced
with >>
and so on. We can rewrite one of our examples as such:
(run-flow (>> (~> (-> :p () "Hello")
(-> :q () "parallel")
(-> :s () "World!"))
(-> :default ((a b c))
(format *output* "~%~A, ~A ~A" a b c))))
As already mentioned, they exist to guard atomic blocks from concurrent execution where needed. One can imagine flow block to be a critical section that is guarded by mutex/lock, only there’s no blocking involved - flow execution is purely non-blocking. It’s worth mentioning, invariant argument of atomic block is evaluated, meaning you can use any expression for it to return required invariant value.
For the sake of experiment, let’s define some long running non-thread-safe incrementing function and required state:
(defparameter *value* 0)
(defun increment-global-variable ()
(let ((value *value*))
(sleep 0.1)
(setf *value* (1+ value))))
And a couple of flows that call this global-state-changing function:
(defun fully-concurrent-flow ()
(~> (-> :one ()
(increment-global-variable))
(-> :two ()
(increment-global-variable))
(-> :three ()
(increment-global-variable))))
(defun non-concurrent-flow ()
(~> (-> :one ()
(increment-global-variable))
(-> :one ()
(increment-global-variable))
(-> :one ()
(increment-global-variable))))
Now, you see those are different only by how invariants are used. #'fully-concurrent-flow
has
three invariants used: :one
, :two
and :three
, while #'non-concurrent-flow
is using
:one
only. Let’s see what would happen when we run those!
(setf *value* 0)
(run-flow (fully-concurrent-flow))
Wait a second (or 0.1 of a second to be somewhat precise) and check the value of
*value*
… 1
. Huh? What happened is that all three blocks started executing concurrently
running #'increment-global-variable
all at once. So at the same time in different threads
#'increment-global-variable
cached 0 value of *value*
in value
variable and then, after
0.1 second, tried to increment it, all setting *value*
to 1. That behavior is crappy and
called a concurrent race: 3 same-time function invocations fought for one resource - *value*
,
and failed to communicate, incorrectly updating the state.
What will happen with another flow we defined? Let’s find out.
(setf *value* 0)
(run-flow (non-concurrent-flow))
Hold on a moment (yes, 0.3 of a second, you already guessed it right) and recheck what we have
in *value*
: 3
. Woah! What happened now? We started executing #'increment-global-variable
three times concurrently as in our previous attempt, but dispatcher figured out they cannot be
run as such by checking their invariant, which is :one
for all of the blocks, so it scheduled
them to run one after another - serially that is, so global state was correctly updated each
time!
In some way, this flow run was similar to what flow:serially
does, only the execution order of
blocks is not guaranteed.