Async
miofn main() -> Result<()> {
let mut poll = mio::Poll::new().expect("Could not create mio::Poll");
let mut events = mio::Events::with_capacity(1024);
let mut connections = HashMap::new();
let mut listener = mio::net::TcpListener::bind("127.0.0.1:9753".parse().unwrap())?;
const SERVER_TOKEN: mio::Token = mio::Token(0);
poll.registry()
.register(&mut listener, SERVER_TOKEN, mio::Interest::READABLE)?;
loop {
poll.poll(&mut events, None)?;
for event in events.iter() {
match event.token() {
SERVER_TOKEN => {
handle_incoming_connections(&mut listener, &mut connections, &mut poll)
}
_ => handle_readable_connection(event.token(), &mut connections, &mut poll),
}?;
}
}
}mio (cont.)fn handle_incoming_connections(
listener: &mut TcpListener,
connections: &mut HashMap<Token, TcpStream>,
poll: &mut Poll,
) -> Result<()> {
loop {
match listener.accept() {
Ok((connection, _)) => {
let token = Token(connections.len() + 1);
connections.insert(token, connection);
let connection = connections.get_mut(&token).unwrap();
poll.registry()
.register(connection, token, Interest::READABLE)?;
eprintln!("Got {} open connections", connections.len());
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => break,
Err(e) => bail!("TcpListener error {}", e),
}
}
Ok(())
}type OnReadCallback = fn(&[u8]) -> ();
/// Wraps a TCP connection and gives it async functions (with callbacks)
struct Connection {
stream: TcpStream,
callbacks: Vec<OnReadCallback>,
}
impl Connection {
pub fn new(stream: TcpStream) -> Self { ... }
pub fn read_sync(&mut self, buffer: &mut [u8]) -> std::io::Result<usize> {
self.stream.read(buffer)
}
pub fn read_async(&mut self, on_read: OnReadCallback) {
self.callbacks.push(on_read);
}
fn notify_read(&mut self, data: &[u8]) {
for callback in self.callbacks.drain(..) {
callback(data);
}
}
}notify_read gets called from the mio event-loop!Futurepoll returns Poll::Ready(T) if the Future has completed, Poll::Pending if notContext knows when to poll the future againFuture is not super usablelet mut future = server.accept_async();
let connection = loop {
match future.poll(???) {
Poll::Pending => (),
Poll::Ready(connection) => break connection,
}
};
let mut future = connection.read_async();
let data = loop {
match future.poll(???) {
Poll::Pending => (),
Poll::Ready(data) => break data,
}
};poll is called busy-waiting and prevents concurrencyContext instance, but where do we get it from?block_on:tokio or async-std provide!mio)mioasync and .awaitasync and .awaitasync: This function is a task (future).await: Yield back to the scheduler.await it!async introduces function coloring: async code can only be called from async code
non-const functions in C++, which can’t be called from const functionsasync world to the async world?
main into an async fn, e.g. by using #[tokio::main]tokio exampleuse mini_redis::{client, Result};
#[tokio::main] // macro turning `main` into `async fn`
async fn main() -> Result<()> {
// Open a connection to the mini-redis address.
let mut client = client::connect("127.0.0.1:6379").await?;
// Set the key "hello" with value "world"
client.set("hello", "world".into()).await?;
// Get key "hello"
let result = client.get("hello").await?;
println!("got value from the server; result={:?}", result);
Ok(())
}Example code taken from tokio documentation
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
let (socket, _) = listener.accept().await.unwrap();
// A new task is spawned for each inbound socket. The socket is
// moved to the new task and processed there.
tokio::spawn(async move {
process(socket).await;
});
}
}async {} wraps code in a Future, a bit like || {} creates an anonymous functionjoin returns when all futures have completedselect returns when a single future has completedselect examplelet (socket, _) = listener.accept().await.unwrap();
let connection_fut = async move {
process(socket).await;
};
let timeout = tokio::time::sleep(Duration::from_millis(2000));
tokio::select! {
val = connection_fut => {
println!("Connection processed successfully");
}
val = timeout => {
println!("Timeout during connection!");
}
}async vs. threads / rayonasync naturally when doing network programmingrayon (or threads in general)asyncasync has more use-cases, e.g. in embedded development (embassy)async code into Futures implementing a state machine?Context type work?Pin<T> and why do futures need it?async + lifetimes is one of Rust’s weakest points and can lead to obscure errors :(