Concurrency Part 3

Async

The problem of I/O

  • Many I/O operations wait most of the time
  • We could do useful work in the meantime, but how?
  • One option: Threads
    • Live example incoming!

Threads + I/O

  • Possible, but not ideal:
    • Overhead of every thread (memory, context switches etc.)
    • More threads than cores is bad for compute-heavy work
    • Coarse-grained control
    • Don’t compose well
  • Other options?

Concurrent tasks - Overview

Task A
accept TCP connection
wait
CPU-heavy processing!
write response to TCP stream
wait
read from TCP stream
wait
Task B
accept TCP connection
wait
CPU-heavy processing!
write response to TCP stream
wait
read from TCP stream
wait
Combined
accept TCP connection
accept TCP connection
wait
read from TCP stream
read from TCP stream
CPU-heavy processing!
CPU-heavy processing!
write response to TCP stream
write response to TCP stream

Concurrent tasks - Overview

Task A
accept TCP connection
wait
CPU-heavy processing!
write response to TCP stream
wait
read from TCP stream
wait
Task B
accept TCP connection
wait
CPU-heavy processing!
write response to TCP stream
wait
read from TCP stream
wait
Combined
accept TCP connection
accept TCP connection
wait
read from TCP stream
read from TCP stream
CPU-heavy processing!
CPU-heavy processing!
write response to TCP stream
write response to TCP stream

Concurrent tasks - Overview

Task A
accept TCP connection
wait
CPU-heavy processing!
write response to TCP stream
wait
read from TCP stream
wait
Task B
accept TCP connection
wait
CPU-heavy processing!
write response to TCP stream
wait
read from TCP stream
wait
Combined
accept TCP connection
accept TCP connection
wait
read from TCP stream
read from TCP stream
CPU-heavy processing!
CPU-heavy processing!
write response to TCP stream
write response to TCP stream

Concurrent tasks - Overview

Task A
accept TCP connection
wait
CPU-heavy processing!
write response to TCP stream
wait
read from TCP stream
wait
Task B
accept TCP connection
wait
CPU-heavy processing!
write response to TCP stream
wait
read from TCP stream
wait
Combined
accept TCP connection
accept TCP connection
wait
read from TCP stream
read from TCP stream
CPU-heavy processing!
CPU-heavy processing!
write response to TCP stream
write response to TCP stream

Concurrent tasks - How?

  • For I/O, there are OS functions that are non-blocking
    • e.g. non-blocking POSIX sockets
  • Approach:
    1. Register a bunch of I/O operations with the OS
    2. Call a polling function that returns if any operation made progress
    3. Process data of this I/O operation (possibly issuing new I/O operations)
    4. Repeat

Async I/O with mio

fn main() -> Result<()> {
    let mut poll = mio::Poll::new().expect("Could not create mio::Poll");
    let mut events = mio::Events::with_capacity(1024);
    let mut connections = HashMap::new();

    let mut listener = mio::net::TcpListener::bind("127.0.0.1:9753".parse().unwrap())?;
    const SERVER_TOKEN: mio::Token = mio::Token(0);

    poll.registry()
        .register(&mut listener, SERVER_TOKEN, mio::Interest::READABLE)?;
    loop {
        poll.poll(&mut events, None)?;

        for event in events.iter() {
            match event.token() {
                SERVER_TOKEN => {
                    handle_incoming_connections(&mut listener, &mut connections, &mut poll)
                }
                _ => handle_readable_connection(event.token(), &mut connections, &mut poll),
            }?;
        }
    }
}

Async I/O with mio (cont.)

fn handle_incoming_connections(
    listener: &mut TcpListener,
    connections: &mut HashMap<Token, TcpStream>,
    poll: &mut Poll,
) -> Result<()> {
    loop {
        match listener.accept() {
            Ok((connection, _)) => {
                let token = Token(connections.len() + 1);
                connections.insert(token, connection);
                let connection = connections.get_mut(&token).unwrap();
                poll.registry()
                    .register(connection, token, Interest::READABLE)?;
                eprintln!("Got {} open connections", connections.len());
            }
            Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => break,
            Err(e) => bail!("TcpListener error {}", e),
        }
    }
    Ok(())
}

I/O multiplexing

  • Handling multiple I/O operations concurrently - from one thread!
  • Lower memory overhead than threads
  • Downside: Code is harder to understand!

Towards async code

  • Instead of waiting for I/O operations to complete, we could notify callbacks
  • “Do this once the I/O operation completes”
  • This is a form of asynchronous code

Async with callbacks

type OnReadCallback = fn(&[u8]) -> ();
/// Wraps a TCP connection and gives it async functions (with callbacks)
struct Connection {
    stream: TcpStream,
    callbacks: Vec<OnReadCallback>,
}

impl Connection {
    pub fn new(stream: TcpStream) -> Self { ... }

    pub fn read_sync(&mut self, buffer: &mut [u8]) -> std::io::Result<usize> {
        self.stream.read(buffer)
    }

    pub fn read_async(&mut self, on_read: OnReadCallback) {
        self.callbacks.push(on_read);
    }

    fn notify_read(&mut self, data: &[u8]) {
        for callback in self.callbacks.drain(..) {
            callback(data);
        }
    }
}
  • notify_read gets called from the mio event-loop!

Usability

server.accept_async(|connection| {
    connection.read_async(|data| {
        if let Ok(string) = std::str::from_utf8(data) {
            eprintln!("Got data from remote: {}", string);
        } else {
            eprintln!("Got data from remote (binary): {:?}", data);
        }
    });
});
  • Works, but gets messy quickly (indentation hell)
  • But: It looks almost like regular (synchronous) code

Futures

  • I/O operations return results that become available at some point in the future
  • Rust has a trait for this: Future
pub trait Future {
    type Output; // What value becomes available in the future?

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Self::Output>;
}
  • poll returns Poll::Ready(T) if the Future has completed, Poll::Pending if not
  • The Context knows when to poll the future again

Usability of futures

  • On its own, Future is not super usable
let mut future = server.accept_async();
let connection = loop {
    match future.poll(???) {
        Poll::Pending => (),
        Poll::Ready(connection) => break connection,
    }
};
let mut future = connection.read_async();
let data = loop {
    match future.poll(???) {
        Poll::Pending => (),
        Poll::Ready(data) => break data,
    }
};
  • Repeatedly calling poll is called busy-waiting and prevents concurrency
  • We still need a Context instance, but where do we get it from?

Executors

  • Futures must be driven by an executor
    • The management engine for future polling, event handling etc.
  • The simplest executor is block_on:
fn block_on<T, F: Future<Output = T>>(f: F) -> T {
    let mut context = ...;
    loop {
        // simplified code, ignores some Rust details (e.g. Pin)
        if let Poll::Ready(result) = f.poll(&mut context) {
            return result;
        }
    }
}

A smarter executor

  • Only poll a future if it is ready to make progress
    • What does progress mean?
  • Don’t busy wait / block
  • Distribute futures onto multiple threads, if possible
  • How do we build something like this?
    • This is what the tokio or async-std provide!

Future progress

  • A future can (and often is) made up of other futures
  • Whenever one sub-future is ready, the whole future can progress to waiting on the next sub-future:
let mut future1 = server.accept_async(); // >----| 1) run until we hit a
let connection = wait(future1); // <-------------|    wait point
let mut future2 = connection.read_async(); // >--| 2) continue from wait
let data = wait(future2); // <-------------------|    point until next one
  • The future function is temporarily exited during a wait point and later resumed
    • Which other mechanism do you know that works like this?

Cooperative multitasking

  • A scheduler (our executor) manages multiple tasks (our futures)
  • Scheduling works by each task yielding back to the scheduler if either:
    • The task is idle
    • The task is blocked (e.g. by I/O!)
  • The scheduler knows when to resume a task (e.g. by using mio)

Cooperative multitasking

Observations

  • Tasks and scheduler need to talk to mio
  • Tasks must make sure to yield (or they will prevent other tasks from making progress)
  • Minimize time spent in scheduler (<-> maximize time spent running tasks)
  • A task is like a state machine

async and .await

  • Rust provides language magic in the form of two keywords: async and .await
// async fn returning T == regular function returning Future<Output = T>
pub async fn accept_async(&mut self) -> Connection {
    todo!()
}

let mut server = Server::new(listener, &poll)?;
let mut connection : Connection = server.accept_async().await;
let data : Vec<u8> = connection.read_async().await;
  • async: This function is a task (future)
  • .await: Yield back to the scheduler
  • BEWARE: A task will never do anything if you don’t .await it!

Function coloring and the root task

  • async introduces function coloring: async code can only be called from async code
    • Like non-const functions in C++, which can’t be called from const functions
  • How do we get from the non-async world to the async world?
    • Option A: Using spawning functions from executor
    • Option B: Turn main into an async fn, e.g. by using #[tokio::main]

tokio example

use mini_redis::{client, Result};

#[tokio::main] // macro turning `main` into `async fn`
async fn main() -> Result<()> {
    // Open a connection to the mini-redis address.
    let mut client = client::connect("127.0.0.1:6379").await?;

    // Set the key "hello" with value "world"
    client.set("hello", "world".into()).await?;

    // Get key "hello"
    let result = client.get("hello").await?;

    println!("got value from the server; result={:?}", result);

    Ok(())
}

Example code taken from tokio documentation

Introducing concurrency

  • Futures still execute sequentially:
#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    loop {
        let (socket, _) = listener.accept().await.unwrap();
        process(socket).await;
        // process() MUST finish before accept() gets called again!
        // No concurrent connections :( 
    }
}
  • The problem: How do we launch additional tasks?

Spawn

  • Executors typically provide functions to launch new tasks (like launching a new process on an OS)
#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    loop {
        let (socket, _) = listener.accept().await.unwrap();
        // A new task is spawned for each inbound socket. The socket is
        // moved to the new task and processed there.
        tokio::spawn(async move {
            process(socket).await;
        });
    }
}
  • async {} wraps code in a Future, a bit like || {} creates an anonymous function

Future combinators

  • Sometimes you have multiple futures and need to combine them in some way:
    • Wait for all futures to finish
    • Wait for the next future that made progress
  • For this we use future combinators:
    • join returns when all futures have completed
    • select returns when a single future has completed

select example

  • Build our own timeout
let (socket, _) = listener.accept().await.unwrap();
let connection_fut = async move {
    process(socket).await;
};
let timeout = tokio::time::sleep(Duration::from_millis(2000));
tokio::select! {
    val = connection_fut => {
        println!("Connection processed successfully");
    }
    val = timeout => {
        println!("Timeout during connection!");
    }
}

async vs. threads / rayon

  • You will encounter async naturally when doing network programming
  • Rule:
    • Your computation is CPU-bound? Use something like rayon (or threads in general)
    • Your computation is I/O bound? Use async
  • async has more use-cases, e.g. in embedded development (embassy)

Advanced topics for further study

  • How does the compiler turn async code into Futures implementing a state machine?
  • How does the Context type work?
  • What is Pin<T> and why do futures need it?
  • Cancel safety of futures
  • Streams (asynchronous iterators)
  • Beware: async + lifetimes is one of Rust’s weakest points and can lead to obscure errors :(