Structured Concurrency
Flix supports CSP-style concurrency with channels and processes inspired by Go and Rust.
Spawning Processes
We can spawn a process with the spawn
keyword:
def main(): Unit \ IO = region rc {
spawn println("Hello from thread") @ rc;
println("Hello from main")
}
Spawned processes are always associated with a region; the region will not exit until all the processes associated with it have completed:
def slowPrint(delay: Int32, message: String): Unit \ IO =
Thread.sleep(Time.Duration.fromSeconds(delay));
println(message)
def main(): Unit \ IO =
region r1 {
region r2 {
spawn slowPrint(2, "Hello from r1") @ r1;
spawn slowPrint(1, "Hello from r2") @ r2
};
println("r2 is now complete")
};
println("r1 is now complete")
This means that Flix supports structured concurrency; spawned processes have clearly defined entry and exit points.
Communicating with Channels
To communicate between processes we use channels. A channel allows two or more processes to exchange data by sending immutable messages to each other.
A channel comes in one of two variants: buffered or unbuffered. Channels are always associated with a region.
A buffered channel has a size, set at creation time, and can hold that many messages. If a process attempts to put a message into a buffered channel that is full, then the process is blocked until space becomes available. If, on the other hand, a process attempts to get a message from an empty channel, the process is blocked until a message is put into the channel.
An unbuffered channel works like a buffered channel of size zero; for a get and a put to happen both processes must rendezvous (block) until the message is passed from sender to receiver.
Here is an example of sending and receiving a message over a channel:
def main(): Int32 \ IO = region rc {
let (tx, rx) = Channel.unbuffered(rc);
spawn Channel.send(42, tx) @ rc;
Channel.recv(rx)
}
Here the main
function creates an unbuffered
channel which returns Sender
tx
and a Receiver
rx
channels,
spawns the send
function, and waits
for a message from the channel.
As the example shows, a channel consists of two end points:
the Sender and the Receiver. As one would expect,
messages can only be send using the Sender
, and only
received using the Receiver
.
Selecting on Channels
We can use the select
expression to receive a
message from a collection of channels.
For example:
def meow(tx: Sender[String, r]): Unit \ r =
Channel.send("Meow!", tx)
def woof(tx: Sender[String, r]): Unit \ r =
Channel.send("Woof!", tx)
def main(): Unit \ IO = region rc {
let (tx1, rx1) = Channel.buffered(rc, 1);
let (tx2, rx2) = Channel.buffered(rc, 1);
spawn meow(tx1) @ rc;
spawn woof(tx2) @ rc;
select {
case m <- recv(rx1) => m
case m <- recv(rx2) => m
} |> println
}
Many important concurrency patterns such as
producer-consumer and load balancers can be expressed
using the select
expression.
Selecting with Default
In some cases, we do not want to block until a message arrives, potentially waiting forever. Instead, we want to take some alternative action if no message is readily available. We can achieve this with a default case as shown below:
def main(): String = region rc {
let (_, rx1) = Channel.buffered(rc, 1);
let (_, rx2) = Channel.buffered(rc, 1);
select {
case _ <- recv(rx1) => "one"
case _ <- recv(rx2) => "two"
case _ => "default"
}
}
Here a message is never sent to r1
nor r2
.
The select
expression tries all cases, and if no
channel is ready, it immediately selects the default
case.
Hence using a default case prevents the select
expression from blocking forever.
Selecting with Timeouts
As an alternative to a default case, we can use
tickers and timers to wait for pre-defined
periods of time inside a select
expression.
For example, here is a program that has a slow
function that takes a minute to send a message on
a channel, but the select
expression relies on
Channel.timeout
to only wait 5
seconds before
giving up:
def slow(tx: Sender[String, r]): Unit \ { r, IO } =
Thread.sleep(Time.Duration.fromSeconds(60));
Channel.send("I am very slow", tx)
def main(): Unit \ IO = region rc {
let (tx, rx) = Channel.buffered(rc, 1);
spawn slow(tx) @ rc;
let timeout = Channel.timeout(rc, Time.Duration.fromSeconds(5));
select {
case m <- recv(rx) => m
case _ <- recv(timeout) => "timeout"
} |> println
}
This program prints the string "timeout"
after five
seconds.