These are some common concurrency patterns that you may find yourself reaching for when writing async Rust. There are often different ways to accomplish similar things in the async ecosystem, and this document seeks to clarify when some uses are more appropriate than others.

Fundamentally, the goal of async Rust (and async programming in general) is to make it easy to run multiple things concurrently. Let’s imagine that we’re building a service responsible for taking in a list of subscription IDs and querying the subscription data in Postgres and the User Alias data in Scylla, then writing a combination of those data into another datastore (this is not representative of OneSignal’s actual architecture).

There are some imports, data structures, and helper functions that we’ll assume are defined in all of the code examples on this page:


use uuid::Uuid;
use eyre::Result;

struct Alias;
struct Subscription;

async fn read_aliases(app_id: Uuid, subscription_id: Uuid) -> Result<Vec<Alias>> {
    Ok(vec![])
}

async fn read_subscription(app_id: Uuid, subscription_id: Uuid) -> Result<Subscription> {
    Ok(Subscription)
}

async fn write_data(
    app_id: Uuid,
    subscription_id: Uuid,
    subscription: Subscription,
    aliases: &[Alias],
) -> Result<Subscription> {
    Ok(Subscription)
}

A sequential beginning

Our service might begin its life with a sequential execution model, many services do. Let’s imagine that the handler for the service we described in the introduction looks something like this:


async fn handle(app_id: Uuid, subscription_ids: &[Uuid]) -> Result<()> {
    for id in subscription_ids {
        let aliases = read_aliases(app_id, *id).await?;
        let subscription = read_subscription(app_id, *id).await?;
        write_data(app_id, *id, subscription, &aliases).await?;
    }

    Ok(())
}

This will give us a reasonable starting point for the rest of the examples on this page. Notice (importantly) that even though our handle function is marked as async and the RPCs it uses all have await after them, indicating that they also return Futures, this function is sequential. It operates on one thing at a time, not performing any concurrent actions. There might be concurrency outside of this function, maybe multiple handle futures are running concurrently, but within the scope of the function there is no concurrency. Notice in the below trace waterfall that no two function calls are allowed to run at the same time.

Screenshot of the trace waterfall view of the execution of this code
Screenshot of the trace waterfall view of the execution of this code

Now that we have a baseline, let’s take a look at how we can add some concurrency to our function call.

Running futures concurrently

tokio::join! - Fixed number of heterogenous operations

Let’s imagine that we want to take our sequential handler and add some concurrency to it. First notice that the two read operations we perform are entirely independent of one another. The read_aliases call and the read_subscription call currently happen sequentially, but the result of the read_aliases query is not used in the read_subscription query at all, and these are read queries so there is no logical reason that they need to be sequential. Let’s try to make our handler a bit faster by running these two things concurrently. The simplest way to do this is by using the tokio::join! macro. This macro works by taking in multiple futures and awaiting them concurrently. It works on a fixed number of futures, since you need to write each future into the macro invocation normally. Adding tokio::join! to our handler function looks like this:


async fn handle(app_id: Uuid, subscription_ids: &[Uuid]) -> Result<()> {
    for id in subscription_ids {
        let (aliases_result, subscription_result) =
            tokio::join!(read_aliases(app_id, *id), read_subscription(app_id, *id));

        let aliases = aliases_result?;
        let subscription = subscription_result?;

        write_data(app_id, *id, subscription, &aliases).await?;
    }

    Ok(())
}

There are a few important things to notice about what’s changed in this code example. Firstly, notice that the awaits have been removed from the read function calls entirely. All of the await-ing is now happening within the body of the join! macro call.

Next, notice that the ? symbols after the read function calls have been moved away from their respective function calls and into the main body of the function. This is required because we have now separated where the function is called from where the results of its Future are available. Recall that ? is used to early-return from a function returning a Result or Option if the expression to the left of the ? is an Err or None, respectively. We were able to use ? with the result of an async function previously because we had await there, turning our Future<T> into a T for us. Now, we no longer have the await on the same line as the function call, so the only thing available to us on the read_aliases line of our join! call, is a Future<Result<Vec<Alias>>>. We do not get access to the resulting Vec<Alias> until after the join! has completed, so if we want to act on the Result, we need to do it there.

This trace waterfall view shows how our read calls are running concurrently.

We’ve managed to make these two operations run concurrently, but each subscription is still processed sequentially. Let’s take a look at how we can process all of the subscriptions concurrently.

FuturesUnordered - Homogenous operations

Generally, when we have a big list of things that we want to run the same operations on, we reach for FuturesUnordered. This type allows us to run any number of futures of the same type concurrently. Since we’re running exactly the same operations for each subscription in our input list, we can concurrent-ize the running of these futures by adding FuturesUnordered.


use futures::stream::{StreamExt, FuturesUnordered};

async fn handle(app_id: Uuid, subscription_ids: &[Uuid]) -> Result<()> {
    let mut fun = FuturesUnordered::new();

    for id in subscription_ids {
        fun.push(async {
            let (aliases_result, subscription_result) =
                tokio::join!(read_aliases(app_id, *id), read_subscription(app_id, *id));

            let aliases = aliases_result?;
            let subscription = subscription_result?;

            write_data(app_id, *id, subscription, &aliases).await?;

            Ok::<_, eyre::Report>(())
        });
    }

    while let Some(result) = fun.next().await {
        result?;
    }

    Ok(())
}

Similarly to when we added concurrency via join!, notice that there is a new layer of taking the results of our futures and using ? to handle returning errors that resulted from underlying function calls. Just like when we used join!, we have separated the futures from the outer control flow, so we need to add that control flow back to the top level of the function.

Notice that we are still able to use ? and Ok from within the futures that we’re adding to our FuturesUnordered, but these ? will only short-circuit the control flow of this inner future, not the entire handle function as they did previously. For that, we need the outer ? which appears on line 21. This trace waterfall view shows how quickly all of the items can be processed when running concurrently.

Limitations

There are some important things to keep in mind when using FuturesUnordered. Firstly, know that it only works on Futures of the same type. This means that if you wanted to add an additional Future to run concurrently with the list of subscription-acting Futures, the compiler would not accept it.


async fn handle(app_id: Uuid, subscription_ids: &[Uuid]) -> Result<()> {
    let mut fun = FuturesUnordered::new();

    for id in subscription_ids {
        fun.push(async {
            let (aliases_result, subscription_result) =
                tokio::join!(read_aliases(app_id, *id), read_subscription(app_id, *id));

            let aliases = aliases_result?;
            let subscription = subscription_result?;

            write_data(app_id, *id, subscription, &aliases).await?;

            Ok::<_, eyre::Report>(())
        });
    }

    fun.push(async { ... }); // ERROR: expected `async` block, found a different `async` block

    while let Some(result) = fun.next().await {
        result?;
    }

    Ok(())
}

If we wanted to run this future concurrently with the others, we could use another layer of tokio::join.


async fn handle(app_id: Uuid, subscription_ids: &[Uuid]) -> Result<()> {
    let mut fun = FuturesUnordered::new();

    ...

    let (result, _) = tokio::join!(
        async {
            while let Some(result) = fun.next().await {
                result?;
            }

            Ok::<_, eyre::Report>(())
        },
        async { ... }
    );

    result?;

    Ok(())
}

Next, it’s important to know that FuturesUnordered, like all Futures, does no work until it is polled. This means that nothing happens until we call .next().await/collect().await/etc on the FuturesUnordered. If you only call FuturesUnordered::push, your Futures will never be polled.

Next, let’s look at a similar way we could run a number of Futures concurrently using the tokio library.

tokio::spawn - Heterogenous/background operations

The tokio library provides many concurrency tools for developers to use when scheduling async operations. Perhaps the most interesting however is the spawn method. This method allows us to spawn a lightweight green thread, called a task, which will be automatically polled by the tokio runtime in the background.

Looking at the previous example where we wanted to concurrently run our subscription-level operations and another async block, we could also accomplish this using a tokio::spawn as such:


async fn handle(app_id: Uuid, subscription_ids: &[Uuid]) -> Result<()> {
    let mut fun = FuturesUnordered::new();

    let join_handle = tokio::spawn(async { ... });

    ...

    while let Some(result) = fun.next().await {
        result?;
    }

    join_handle.await?;

    Ok(())
}

This seems much simpler to parse visually, there’s a lot less nesting after removing the tokio::join! line, and we can have the control flow of the FuturesUnordered::next loop control the flow of the handle function directly. There are a couple of things to notice about this example though.

Firstly, notice that we separate the creation of the task (line 4) from awaiting the task’s JoinHandle (line 12). Recall that for normal Futures, nothing would happen until .await is used on line 12. For a normal Future, separating creation from polling is not an effective way to concurrently run things. It is only because tokio::spawn creates a background task that we are able to achieve concurrency by doing this separation.

Next, notice that there’s a ? on line 12 when we await the task’s JoinHandle. This is required because sometimes tasks will fail during their runtime. A task might fail because it panics, or because it was cancelled by the runtime before it could complete. This failure is captured in the Result that the JoinHandle resolves to (see its Future impl for details).

Now that we understand how tokio::spawn can be used to run Futures concurrently, let’s look at how it compares with FuturesUnordered.

When to use FuturesUnordered vs tokio::spawn

Based on the previous two sections, it may be unclear when you should prefer using tokio::spawn or FuturesUnordered for your code. They can be used for very similar ends, and it’s not always clear when one should be used vs the other. Here are some things to consider when choosing between these two concurrency tools.

Firstly, as we’ve already discussed, FuturesUnordered is a collection of Futures of the same type. It can only be used to concurrently run many Futures doing the same thing. It’s a great option to use if you’re running the same operation over many items in a collection, but it can’t really help you to run multiple Futures of different types.

Next, since FuturesUnordered requires something else to poll it (call .next()/.collect()/etc), it is not suitable for creating background tasks. It’s suitable for running things where you can point in the code and say “this is where these Futures should begin working, and this is where they should be done working.” Things with indeterminate end times should probably run in the background using tokio::spawn.

Also, because of the way the Future trait works in Rust, it is not possible to distribute the work in a FuturesUnordered across multiple CPUs. Because a FuturesUnordered is just a collection of Future objects that are polled together, they will always run on the same CPU. If you determine that the work of your program is sticking on a single CPU, you may want to consider breaking up tasks using tokio::spawn. Each different tokio tasks can be run on a different CPU, but within each task there will be no CPU distribution. The CPU responsible for running a particular task may change, but there is no parallelism within a single task.

Note

You should never use tokio::spawn in order to create CPU-bound
parallelism. Tokio assumes that Futures spawned onto its runtime will be
io-bound and polling them will be very fast. If you need to run CPU-bound
work, you should use spawn_blocking.

Finally, it’s worth noting that you could use both of these two things in tandem. If you wanted to run many tasks concurrently and wait for all of them to complete, you might do something like this:


async fn handle(app_id: Uuid, subscription_ids: &[Uuid]) -> Result<()> {
    let mut fun = FuturesUnordered::new();

    let join_handle = tokio::spawn(async {});

    for id in subscription_ids {
        let id = *id;
        fun.push(tokio::spawn(async move {
            let (aliases_result, subscription_result) =
                tokio::join!(read_aliases(app_id, id), read_subscription(app_id, id));

            let aliases = aliases_result?;
            let subscription = subscription_result?;

            write_data(app_id, id, subscription, &aliases).await?;

            Ok::<_, eyre::Report>(())
        }));
    }

    while let Some(result) = fun.next().await {
        result??;
    }

    join_handle.await?;

    Ok(())
}

Notice that we changed the async block to an async move block and dereferenced the &Uuid into a Uuid so that the spawned future would take ownership of the subscription ID instead of taking a reference to it. This reference-taking does not work because all Futures passed to tokio::spawn must be static (have no non static references).

Also notice that the while loop at the end of the function has an extra ?, since tokio::spawn always adds a Result<T, JoinError> around the spawned task to account for panic or cancellation errors.

You might want to do this to take advantage of the fact that tokio::spawn will allow these Futures on multiple CPUs concurrently. In many cases, this is not necessary, but it’s worth noting that it is possible.

Now that we understand tokio::join!, FuturesUnordered, and tokio::spawn, let’s look at another way we can run Futures concurrently - tokio::select!.

select! - When you want to race

The concurrency methods that we explored previously allow us to run multiple Futures concurrently until completion. There are many cases, however, when you want to run many Futures concurrently, but you do not want to allow every Future to complete. Let’s step away from the inside of the handle function for a moment and consider what might be on the outside of it. Let’s imagine that requests are sent into our application on a channel,and we want to shut down our application gracefully after receiving a Ctrl+C from the user, meaning we want to fully process all in-flight requests but not begin handling any new ones. Based on the concurrency tools we’ve already looked at, it’s not clear how we might do this, since we don’t have a means to “interrupt” a Future. Let’s see how we can achieve this using a select! block.


#[tokio::main]
async fn main() {
    let ctrl_c = tokio::signal::ctrl_c();
    let (tx, mut rx) = tokio::sync::mpsc::channel::<(Uuid, Vec<Uuid>)>(1_000);

    // Pass `tx` somewhere so that requests are enqueued into it

    // This is often required when using a future with `tokio::select!` which
    // was not created in the `select!` block
    tokio::pin!(ctrl_c);

    loop {
        tokio::select! {
            _ = &mut ctrl_c => {
                // Wait for all in-flight requests to complete
                break;
            }
            next_item = rx.recv() => {
                let Some((app_id, subscription_ids)) = next_item else {
                    break;
                };

                tokio::spawn(async move {
                    if let Err(e) = handle(app_id, &subscription_ids).await {
                        eprintln!("Error! {e}");
                    }
                });
            }
        }
    }
}

This code will pull items off of the channel and spawn them as independent tokio tasks, processing them concurrently. It will continue to do this until Ctrl+C is pressed, at which point the program will wait for all in-flight request handlers to complete (left as an exercise to the reader), then break out of its infinite loop and exit.

Notice, that the way we handle each of these futures is a bit different. The Ctrl+C future is created outside of the infinite loop, pinned in place, then mutably passed to select!, while the rx.recv() future is created anew for each iteration of the loop. This is a very important distinction, and one that you should keep in mind when you’re using select! in a loop like this. What should happen if your select! block is reached for a second time? Should an interrupted future be started fresh, or should it be resumed? This is something you must consider as you use select! and similar tools. See the below diagram for a visual of how the different futures are polled and either re-used or discarded in this example.

There are a few important things to notice about this diagram. First, notice that the future polled first changes from iteration-to-iteration. select! randomly picks branch polling order, which is intended to provide fairness. This can be overridden to provide a guaranteed polling order (see the docs for details), but generally this is the correct default unless using select! in a tight loop. Next, notice that there are some iterations of the loop where the Ctrl+C future is not polled at all. Since the rc.recv() future is polled first and it resolves, there is no opportunity for other futures to be polled.

Since select! is a macro, it can have as many branches as we want, we can run many many futures concurrently with it. The key difference between select! and other forms of concurrency we’ve already looked at is - select! does not assume that we’ll want to complete every future. It might be the case that this program terminates without Ctrl+C ever being pressed, without that future ever resolving. It might also be the case that the program terminates without any items ever being pulled off of rx.

Note

Note that because futures can and will be interrupted (canceled) when using select! or similar concurrency tools like tokio::time::timeout, it's
critical to ensure that all contained futures are "cancel-safe," meaning
they will not drop data or leave data in an invalid state if they're
canceled. See tokio documentation for details on cancellation safety.

Controlling level of concurrency

Let’s imagine that we discover that we’re placing too much stress on some of these underlying services. We might want to limit the number of things our handle function does concurrently in order to limit the stress that we’re placing on underlying data stores/services. There are two common ways that we might accomplish this - via buffer_unordered or by using a Semaphore.

buffer_unordered - When order/control doesn’t matter that much

The simplest way to control the level of concurrency when running a lot of Futures concurrently is by using buffer_unordered. This is a function from the StreamExt trait from the futures crate, there are a lot of handy combinator functions in there that you can explore. This particular function works by taking a Stream of Futures and running up to a desired number of those Futures concurrently with one another, creating a new Stream which yields results as the underlying Futures complete. It works somewhat similarly to FuturesUnordered, let’s take a look at how we might use it to limit our handle function to only running 4 operations concurrently at any given time.


use futures::stream::StreamExt;

async fn handle(app_id: Uuid, subscription_ids: &[Uuid]) -> Result<()> {
    let mut stream = futures::stream::iter(subscription_ids)
        .map(|id| async {
            let (aliases_result, subscription_result) =
                tokio::join!(read_aliases(app_id, *id), read_subscription(app_id, *id));

            let aliases = aliases_result?;
            let subscription = subscription_result?;

            write_data(app_id, *id, subscription, &aliases).await?;

            Ok::<_, eyre::Report>(())
        })
        .buffer_unordered(4);

    while let Some(result) = stream.next().await {
        result?;
    }

    Ok(())
}

This will run the same computations as our FuturesUnordered example, but will only permit 4 subscription IDs to be processed concurrently. Notice that the while loop at the end of the function and the the async block is unchanged are essentially unchanged from the FuturesUnordered version, it’s quite simple to adapt one of these versions to the other. If you want to avoid using a stream combinator like map, you can also construct a Vec<Future> similar to how we construct the FuturesUnordered, then pass that Vec to stream::iter(vec).buffer_unordered(4). The below trace waterfall screenshot shows what happens when five items are processed with this concurrency limit of four.

buffer_unordered is a simple option to use when all of your computations are a part of a single continuous stream, but it doesn’t work in all cases. Let’s take a look at another technique that provides us with some more fine-grained control over our concurrency.

Semaphore - When you want more control

You may have heard this before, but basically every concurrency primitive (mutex/channel/rwlock/etc) can be implemented in terms of a semaphore. A semaphore is a general-purpose data structure used to answer a simple question in a thread-safe way - “how many units of SOME RESOURCE are available right now?” In our case, we’re going to imagine that the “resource” we’re controlling access to is “simultaneous write queries.”

If the upstream write system imposed a limit that each client should perform no more than 100 concurrent writes against it, it would be quite difficult to control this via buffer_unordered. We could change the buffer_unordered(4) to buffer_unordered(100), but this would only control concurrency within a single request. Presumably, our service handles many requests concurrently. Most RPC frameworks don’t have a place where we, say, loop over all in-flight requests and handle them together. Each handle call is separate, but they need to share this state between them, this tracking of “how many write queries are in progress right now.” We can do this by using a tokio Semaphore, which looks like this.


fn main() {
    let write_semaphore = Arc::new(Semaphore::new(100));

    ... initialize handlers here
}

async fn handle(
    write_semaphore: Arc<Semaphore>,
    app_id: Uuid,
    subscription_ids: &[Uuid],
) -> Result<()> {
    let mut stream = futures::stream::iter(subscription_ids)
        .map(|id| async {
            let (aliases_result, subscription_result) =
                tokio::join!(read_aliases(app_id, *id), read_subscription(app_id, *id));

            let aliases = aliases_result?;
            let subscription = subscription_result?;

            let _write_guard = write_semaphore.acquire().await;
            write_data(app_id, *id, subscription, &aliases).await?;

            Ok::<_, eyre::Report>(())
        })
        .buffer_unordered(4);

    while let Some(result) = stream.next().await {
        result?;
    }

    Ok(())
}

The code that we’ve written here will allow up to four Futures to concurrently run, each of which will run our two read queries concurrently, then wait for an open permit from the Semaphore and run the write query. Since the Semaphore will be shared across handle function invocations, this application will only run 100 write queries concurrently across the entire application. Since 100 is a pretty high number to look at in a single example image, the below trace view shows what this execution would look like with 3 permits on the write_semaphore.

Notice that the write_data call within the 4th process one subscription span is delayed until the first three write_data calls are allowed to complete. This is our semaphore in action! Even though the 4th process one subscription span is still running, the 5th one is allowed to concurrently start with it, because the first three calls have finished, and our buffer_unordered(4) will allow any four to run concurrently.

By using a Semaphore, you can effectively limit concurrency in almost any way you can imagine. Now that we understand a bit about Semaphores, let’s move on to looking at some more nuances of async Rust.

Ownership and borrowing with asynchronous characteristics

One of the more challenging things to wrap your head around when starting as a Rust developer is the ownership and borrowing system. Similarly, one of the more challenging things about writing async Rust is understanding how ownership and borrowing work (or sometimes do not work) with futures and tasks. The good news is that all of the standard ownership and borrowing rules still apply in async Rust, there are no new paradigms to learn about how you use or share memory. We do, however, need to carefully consider how the rules we already know apply when working in a slightly different context.

Throughout this section, it’s critical to remember that a Future in Rust is a value. It’s not magic, it’s not so special and complicated that you can’t comprehend it. It’s a little state machine that stores some state and knows how to continuously answer the question “can you move forward now or not?” When we store variables across .await points in a Future, we need to put those variables onto the state machine so that the future can be suspended and then later resumed.

In this section, we’re going to look at what’s happening mechanically when we run code concurrently with async Rust. Once we abstract away from the actual concurrency of it, thinking about the ways that ownership and borrowing work should be a bit simpler. Let’s begin by first considering what happens when using FuturesUnordered.

Ownership with FuturesUnordered

When we put one or more futures into a FuturesUnordered, they are all stored within that FuturesUnordered, they all are all owned by that FuturesUnordered. Recall that these futures will all be polled on the same thread concurrently, and only when FuturesUnordered::next() or similar method is being run. All of this means that we can basically think about a FuturesUnordered very similarly to a Vec. Just like a Vec, we add a number of values to a FuturesUnordered, they exist there for a period of time, maybe they’re processed, and then eventually (when the Vec goes out of scope) they’re dropped. From an ownership perspective, anything that you can do with a Vec, you can do with a FuturesUnordered. Let’s take a look at what I mean by that.


use std::time::Duration;

use futures::{stream::FuturesOrdered, StreamExt};
use tokio::time::sleep;

fn vec_references_that_stay_in_scope() {
    let words = ["hello".to_string(), "world".to_string()];
    let mut prints = Vec::new();

    for i in &words {
        // This reference-taking is done explicitly so that you can see `i` is
        // _borrowed_ by the vec.
        let i: &str = &i[..];

        prints.push(i);
    }

    for word in prints {
        println!("{}", word);
    }
}

async fn futures_unordered_references_that_stay_in_scope() {
    let words = ["hello".to_string(), "world".to_string()];
    let mut prints = FuturesOrdered::new();

    for i in &words {
        // This reference-taking is done explicitly so that you can see `i` is
        // _borrowed_ by the FuturesUnordered.
        let i: &str = &i[..];

        prints.push(async move {
            sleep(Duration::from_millis(10)).await;
            println!("{}", i);
        });
    }

    while let Some(()) = prints.next().await {}
}

In both of these functions, we place references into a collection and act on them. There is no ownership issue here, since in both cases the variable words which owns the values referenced by the prints collections remains in scope. Let’s see what happens however, if we try to go outside of that scope.


fn vec_references_that_escape_scope() -> Vec<&str> { // ERROR: missing lifetime specifier
    let words = ["hello".to_string(), "world".to_string()];
    let mut prints = Vec::new();

    for i in &words {
        // This reference-taking is done explicitly so that you can see `i` is
        // _borrowed_ by the vec.
        let i: &str = &i[..];

        prints.push(i);
    }

    prints
}

async fn futures_unordered_references_that_escape_scope() -> FuturesUnordered<impl Future> {
    let words = ["hello".to_string(), "world".to_string()];
    let mut prints = FuturesUnordered::new();

    for i in &words {
        // This reference-taking is done explicitly so that you can see `i` is
        // _borrowed_ by the FuturesUnordered.
        let i: &str = &i[..];

        prints.push(async move {
            sleep(Duration::from_millis(10)).await;
            println!("{}", i);
        });
    }

    prints // ERROR: cannot return value referencing local variable `words`
}

This code fails to compile. Both the Vec function and the FuturesUnordered function fail to compile for (essentially) the same reason. In both cases, we are attempting to return a value which is referencing a local variable. We cannot do this! The only reason the second function does not fail to compile on the signature line is that impl Future does not require a lifetime specifier when used in a return signature in all cases, like a reference like &str does.

Similarly, the same rules about mutable and immutable borrows work on FuturesUnordered too! Let’s make an example that mutates a local variable from within a FuturesUnordered.


use std::time::Duration;

use futures::{stream::FuturesUnordered, StreamExt};
use tokio::time::sleep;

fn vec_mutable_refs() {
    let mut words = ["hello".to_string(), "world".to_string()];
    let mut prints = Vec::new();

    for i in &mut words {
        prints.push(i);
    }

    for word in prints {
        word.push('!');
    }

    println!("{words:?}");
}

async fn futures_unordered_mutable_refs() {
    let mut words = ["hello".to_string(), "world".to_string()];
    let mut prints = FuturesUnordered::new();

    for i in &mut words {
        prints.push(async move {
            sleep(Duration::from_millis(10)).await;
            i.push('!');
        });
    }

    while let Some(()) = prints.next().await {}
    drop(prints);

    println!("{words:?}");
}

Again, the concepts that we’ve learned previously about storing mutable references on a Vec apply very cleanly to a FuturesUnordered. Notice that in both cases we’re able to store mutable references to items in an array and mutate those items. In the case of the FuturesUnordered, we can even mutate the items from within an async block! This is a very powerful tool for writing simple concurrent code. We don’t need channels, we don’t need intermediate collections, we can mutate outer-scoped variables from within async blocks when we’re using things like FuturesUnordered.

Now that we have a sense of how to use and share ownership when using FuturesUnordered, let’s talk about tokio::spawn.

Ownership with tokio::spawn

If a FuturesUnordered is a local Vec of things, using tokio::spawn is more akin to something like this:


use std::sync::Mutex;

static TASKS: Mutex<Vec<Box<dyn 'static + Send + Sync>>> = Mutex::new(Vec::new());

When we’re using tokio::spawn, we’re necessarily creating things that can escape from the scope of the function that creates them. Everything that we pass to tokio::spawn winds up in a big global queue of async tasks that the compiler can’t reason about as cleanly as it can with FuturesUnordered. All the rules that would apply to a big global Vec of things also apply to things passed to tokio::spawn.

In contrast to FuturesUnordered, we cannot reference local variables from the outside scope when using tokio::spawn. This limitation is captured by the 'static bound in the code example above, which also exists in the signature of tokio::spawn. 'static means “cannot contain non-static references,” it does not mean “every variable must be a 'static reference.”

Let’s take a look at how our previous examples may or may not work when using tokio::spawn.


use std::time::Duration;
use tokio::time::sleep;

fn spawn_references_that_stay_in_scope() {
    let words = ["hello".to_string(), "world".to_string()];

    for i in &words { // ERROR: `words` does not live long enough
                      // argument requires that `words` is borrowed for `'static`
        tokio::spawn(async move {
            sleep(Duration::from_millis(10)).await;
            println!("{}", i);
        });
    }

    println!("{words:?}");
}

This code that worked perfectly fine with FuturesUnordered now refuses to compile with tokio::spawn. This is because tokio::spawn requires us to not reference any variables from the enclosing scope, which is hinted at by the “this is not 'static, but it has to be!” error message. So how could we make this work? There are a few different ways. We can either move the original String, clone the String and move the cloned copy, share a read-only reference via an Arc, or sharing a mutable reference via Arc<Mutex>. Let’s start by looking at moving.


use std::time::Duration;
use tokio::time::sleep;

fn spawn_move() {
    let words = ["hello".to_string(), "world".to_string()];

    // The `&` that used to be on this line was removed, making this into a "consuming" loop.
    for i in words {
        tokio::spawn(async move {
            sleep(Duration::from_millis(10)).await;
            println!("{}", i);
        });
    }

    // Notice that we're no longer able to access `words` after the loop,
    // because it was moved into the loop.
}

Moving is generally the right answer if you don’t need to access a variable after passing it to tokio::spawn. There’s no ownership issue in this code, because the newly spawned task fully owns the String that we pass into it. There’s no reference to a local variable to worry about. This will not work for you however, if you need to access the value after passing it to the spawned task.

If you do need to access a value from both within a spawned task and outside of it, you might consider cloning the value, like this:


use std::time::Duration;
use tokio::time::sleep;

fn spawn_clone_and_move() {
    let words = ["hello".to_string(), "world".to_string()];

    for i in &words {
        let i = i.clone();

        tokio::spawn(async move {
            sleep(Duration::from_millis(10)).await;
            println!("{}", i);
        });
    }

    println!("{words:?}");
}

This is essentially the same code as the last example, with the added benefit of allowing us to access words after the loop exits. This is an effective solution for allowing multiple places to read values that were the same at one time.

Note

It's important to note that .clone() in this case creates a wholly
separate String with no ties back to the original one, other than "at the
time of calling .clone(), the values in the two Strings are the same."
If you want to ensure that shared values do not change, or that changes in
one place are reflected in other places, you need to actually share memory
via Arc/Mutex etc.

One limitation of this approach is that it will increase memory usage. Since our values are now stored twice in memory, we’re effectively doubling the memory requirements of our program. If we want to avoid large increases in memory usage, we might consider instead using Arc to share values, like this


use std::{sync::Arc, time::Duration};
use tokio::time::sleep;

fn spawn_arc() {
    let words = [
        Arc::new("hello".to_string()),
        Arc::new("world".to_string())
    ];

    for i in &words {
        let i = i.clone();

        tokio::spawn(async move {
            sleep(Duration::from_millis(10)).await;
            println!("{}", i);
        });
    }

    println!("{words:?}");
}

While most of this function looks the same as the previous one, note that words does not store String values, it stores Arc<String> values. An Arc is an atomic reference-counted pointer which can be shared across threads for read-only access. In contrast to the previous example, this function stores each string in memory only once. Arcs are fairly cheap to clone, and can be used liberally to share read-only access to variables across threads or tasks.

An Arc can help you share read-only access to values, but if you want to mutate a shared value, you’ll need to use a Mutex. Here’s an example of how you can do that:


use std::{
    sync::{Arc, Mutex},
    time::Duration,
};

use tokio::time::sleep;

async fn spawn_arc_mutex() {
    let words = [
        Arc::new(Mutex::new("hello".to_string())),
        Arc::new(Mutex::new("world".to_string())),
    ];

    for i in &words {
        let i = i.clone();

        tokio::spawn(async move {
            sleep(Duration::from_millis(10)).await;
            let mut lock = i.lock().unwrap();
            lock.push('!');
        });
    }

    sleep(Duration::from_millis(1_000)).await;
    println!("{words:?}");
}

The sleep on line 24 exists to allow the function to catch up with all of the mutations that are happening in the tokio tasks. This should not be used as an actual synchronization method in production code, replacing it is left as an exercise for the reader. What synchronization method have we already learned that would let us wait on the tokio task completions concurrently?

Closing words

I hope this guide was helpful for your Rust concurrency learning!