Concurrency with Channels and Processes

Flix supports CSP-style concurrency with channels and processes inspired by Go and Rust.

Spawning Processes

We can spawn a process with the spawn keyword:

spawn (1 + 2) @ Static

This spawns a process in the Static region that computes 1 + 2 and throws the result away. The spawn expression always returns Unit. We can spawn any expression, but we typically spawn functions to run in a new process:

def sum(x: Int32, y: Int32): Int32 = x + y

def main(): Unit \ IO = region r {
    spawn sum(1, 2) @ r

Communicating with Channels

To communicate between processes we use channels. A channel allows two or more processes to exchange data by sending immutable messages to each other.

A channel comes in one of two variants: buffered or unbuffered. Channels are always associated with a region.

A buffered channel has a size, set at creation time, and can hold that many messages. If a process attempts to put a message into a buffered channel that is full, then the process is blocked until space becomes available. If, on the other hand, a process attempts to get a message from an empty channel, the process is blocked until a message is put into the channel.

An unbuffered channel works like a buffered channel of size zero; for a get and a put to happen both processes must rendezvous (block) until the message is passed from sender to receiver.

Here is an example of sending and receiving a message over a channel:

def main(): Int32 \ IO = region r {
    let (tx, rx) = Channel.unbuffered(r);
    spawn Channel.send(42, tx) @ r;

Here the main function creates an unbuffered channel which returns Sender tx and a Receiver rx channels, spawns the send function, and waits for a message from the channel.

As the example shows, a channel consists of two end points: the Sender and the Receiver. As one would expect, messages can only be send using the Sender, and only received using the Receiver.

Selecting on Channels

We can use the select expression to receive a message from a collection of channels. For example:

def meow(tx: Sender[String, r]): Unit \ { Write(r) } = 
    Channel.send("Meow!", tx)

def woof(tx: Sender[String, r]): Unit \ { Read(r), Write(r) } = 
    Channel.send("Woof!", tx)

def main(): Unit \ IO = region r {
    let (tx1, rx1) = Channel.buffered(r, 1);
    let (tx2, rx2) = Channel.buffered(r, 1);
    spawn meow(tx1) @ r;
    spawn woof(tx2) @ r;
    select {
        case m <- recv(rx1) => m
        case m <- recv(rx2) => m
    } |> println

Many important concurrency patterns such as producer-consumer and load balancers can be expressed using the select expression.

Selecting with Default

In some cases, we do not want to block until a message arrives, potentially waiting forever. Instead, we want to take some alternative action if no message is readily available. We can achieve this with a default case as shown below:

def main(): String = region r {
    let (_, rx1) = Channel.buffered(r, 1);
    let (_, rx2) = Channel.buffered(r, 1);
    select {
        case _ <- recv(rx1) => "one"
        case _ <- recv(rx2) => "two"
        case _             => "default"

Here a message is never sent to r1 nor r2. The select expression tries all cases, and if no channel is ready, it immediately selects the default case. Hence using a default case prevents the select expression from blocking forever.

Selecting with Timeouts

As an alternative to a default case, we can use tickers and timers to wait for pre-defined periods of time inside a select expression.

For example, here is a program that has a slow function that takes a minute to send a message on a channel, but the select expression relies on Channel.timeout to only wait 5 seconds before giving up:

def slow(tx: Sender[String, r]): Unit \ { Write(r), IO} =
    Channel.send("I am very slow", tx)

def main(): Unit \ IO = region r {
    let (tx, rx) = Channel.buffered(r, 1);
    spawn slow(tx) @ r;
    let timeout = Channel.timeout(r, Time/Duration.fromSeconds(5));
    select {
        case m <- recv(rx)       => m
        case _ <- recv(timeout)  => "timeout"
    } |> println

This program prints the string "timeout" after five seconds.