For this library to work, one would need to implement proper flow dispatcher, but, hopefully, one already exists: simple-flow-dispatcher.
(ql:quickload '(:simple-flow-dispatcher :cl-flow))
To assist with our journey, let’s also import
cl-flow symbols and define a utility method and
variables to run our flow:
(use-package :cl-flow) (defvar *output* *standard-output*) (defun print-error (e) (format *output* "~A" e)) (defvar *dispatcher* (simple-flow-dispatcher:make-simple-dispatcher :threads 4 :error-handler #'print-error)) (defun run-flow (flow) (run *dispatcher* flow))
(run-flow (flow:atomically :default () (print "Hello, flowing World!" *output*)))
Looks like a lot is happenning here and, guess what, it is indeed! We just printed a string in
one of the four threads of the
*dispatcher* asynchronously! To confirm, just add a
and you will see that
#'run-flow returns immediately and string is actually printed a bit later.
(run-flow (flow:atomically :default () (sleep 1) (print "Hello, flowing World!" *output*)))
Let’s enhance this example with more feaures
cl-flow provides for a quick overview of the
(run-flow (flow:serially (flow:atomically :default () "Hello, flowing World!") (flow:atomically :default (argument) (print argument *output*))))
(flow:atomically :default ()) form we are going to call a flow block.
:default value here denotes an
invariant. Blocks with same (by
#'EQ) invariants are never executed concurrently (in parallel)
simple-flow-dispatcher. Second form in
flow:atomically block (
(argument) here) declares an
argument that could be passed to the flow block from a previos block. And starting from third
form and henceforth is the body of the block that actually does a work we need.
flow:serially lists flow blocks that must be executed serially - one after another.
Here, in this example, we specified two atomic blocks to be executed serially with passing a result of the first block into the second one and printing this result.
But dispatching serially is no fun. We can do that w/o this crappy overloaded syntax. Let’s do some multithreading!
(run-flow (flow:serially (flow:concurrently (flow:atomically :p () "Hello") (flow:atomically :q () "parallel") (flow:atomically :s () "World!")) (flow:atomically :default (result) (destructuring-bind (a b c) result (format *output* "~A, ~A ~A" a b c)))))
Wows! You didn’t notice, but we just run three blocks of code in parallel, but also gathered the
results in predictable order! Not unlike how semaphores work only w/o any blocking. By the way,
flow:concurrently operator list blocks, that are going to be run in parallel.
Basically, the code above means we scheduled a concurrent block (
flow:concurrently) to run serially with next
atomic block (
(flow:atomically :default ... )). In concurrent block, we specified 3 atomic blocks to run in
parallel. Results of those blocks are then gathered into a list and passed to the next block
flow:concurrently one - see
To make sure we indeed ran those blocks in parallel, let’s change example a bit:
(run-flow (flow:serially (flow:concurrently (flow:atomically :p () (sleep 1) (print "Processing hello" *output*) "Hello") (flow:atomically :q () (sleep 1/2) (print "Processing parallel" *output*) "parallel") (flow:atomically :s () (sleep 1/4) (print "Processing world" *output*) "World!")) (flow:atomically :default (result) (destructuring-bind (a b c) result (format *output* "~%~A, ~A ~A" a b c)))))
No way! We did indeed. And the order of results is preserved. How awesome is that? Yeah, total crap if one would need to write this heavy syntax trees all the time, I agree. Let’s make it a bit more maintainable.
First, let’s extract flows into a couple of functions:
(defun string-constructing-flow () (flow:concurrently (flow:atomically :p () "Hello") (flow:atomically :q () "parallel") (flow:atomically :s () "World!"))) (defun printing-flow () (flow:atomically :default ((a b c)) (format *output* "~%~A, ~A ~A" a b c)))
And let’s run those:
(run-flow (flow:serially (string-constructing-flow) (printing-flow)))
Woohoo, it worked!
You might have noticed, that flow block in
#'printing-flow looks a bit different. We specified
((a b c)) as arguments instead of just
(result). Yes! If you wish, you can put destructuring
lambda list as a block argument and
cl-flow will destructure incoming value for you.
For this special DSL for building computation trees I personally prefer shorthand syntax
-> is a synonym for
flow:serially can be replaced
>> and so on. We can rewrite one of our examples as such:
(run-flow (>> (~> (-> :p () "Hello") (-> :q () "parallel") (-> :s () "World!")) (-> :default ((a b c)) (format *output* "~%~A, ~A ~A" a b c))))
As already mentioned, they exist to guard atomic blocks from concurrent execution where needed. One can imagine flow block to be a critical section that is guarded by mutex/lock, only there’s no blocking involved - flow execution is purely non-blocking. It’s worth mentioning, invariant argument of atomic block is evaluated, meaning you can use any expression for it to return required invariant value.
For the sake of experiment, let’s define some long running non-thread-safe incrementing function and required state:
(defparameter *value* 0) (defun increment-global-variable () (let ((value *value*)) (sleep 0.1) (setf *value* (1+ value))))
And a couple of flows that call this global-state-changing function:
(defun fully-concurrent-flow () (~> (-> :one () (increment-global-variable)) (-> :two () (increment-global-variable)) (-> :three () (increment-global-variable)))) (defun non-concurrent-flow () (~> (-> :one () (increment-global-variable)) (-> :one () (increment-global-variable)) (-> :one () (increment-global-variable))))
Now, you see those are different only by how invariants are used.
three invariants used:
#'non-concurrent-flow is using
:one only. Let’s see what would happen when we run those!
(setf *value* 0) (run-flow (fully-concurrent-flow))
Wait a second (or 0.1 of a second to be somewhat precise) and check the value of
1. Huh? What happened is that all three blocks started executing concurrently
#'increment-global-variable all at once. So at the same time in different threads
#'increment-global-variable cached 0 value of
value variable and then, after
0.1 second, tried to increment it, all setting
*value* to 1. That behavior is crappy and
called a concurrent race: 3 same-time function invocations fought for one resource -
and failed to communicate, incorrectly updating the state.
What will happen with another flow we defined? Let’s find out.
(setf *value* 0) (run-flow (non-concurrent-flow))
Hold on a moment (yes, 0.3 of a second, you already guessed it right) and recheck what we have
3. Woah! What happened now? We started executing
three times concurrently as in our previous attempt, but dispatcher figured out they cannot be
run as such by checking their invariant, which is
:one for all of the blocks, so it scheduled
them to run one after another - serially that is, so global state was correctly updated each
In some way, this flow run was similar to what
flow:serially does, only the execution order of
blocks is not guaranteed.