MP

Async Rust Patterns: Send a Sender

File this one under “duh, so obvious,” but it took me a while to think of it de novo, so I figured I'd toss it up on here.

Say you have some background thread or task, and you occasionally want to get some information from it. This could be something that is running entirely independently, like a system monitor, a heartbeat, or a stats collector. It could also be something that some other thread or task is communicating with frequently, such as a handler for socket messages.

It's easy to clone a Sender and share it wherever you might need to send information to the task, but what if you want to get something back?

One option is to ensure that everyone who might want to get data back has a channel to the task, spawning N Sender/Receiver pairs. This isn't necessarily bad for cases where there will be a lot of back and forth or where the number of channels is low, but it's unwieldy, fragile, and difficult to extend.

Often, an easier option is just to send a Sender through the one you already have!

So, instead of:

use tokio::sync::mpsc;

struct Task {
    receiver: mpsc::Receiver<Data>,
}

We can have:

use tokio::sync::{mpsc, oneshot};

enum TaskMessage {
    Data(Data),
    StatsRequest(oneshot::Sender<DataAccumulator>),
}

struct Task {
    receiver: mpsc::Receiver<TaskMessage>,
}

From there, your consumption loop just needs to send whatever data was requested back through the provided Sender:

use futures::stream::{Stream, StreamExt};

struct Task {
    receiver: mpsc::Receiver<TaskMessage>
}
impl Task {
    fn into_stream(self) -> impl Stream<Item = ()> {
        stream::unfold(
            // each iteration of the stream gets the receiver and
            // the data accumulator from the last iteration.
            (self.receiver, DataAccumulator::new()), // just some theoretical accumulator
            async |(mut rx, mut acc)| match rx.recv().await {
                None => None,
                Some(msg) => match msg {
                    TaskMessage::Data(data) => {
                        acc.push(data);
                        Some(((), (rx, acc)))
                    }
                    TaskMessage::StatsRequest(sender) => {
                        sender.send(acc.clone()).unwrap();
                        Some(((), (rx, acc)))
                    }
                },
            },
        )
    }
}

From the calling side, it's easy to construct a oneshot channel and send it through, assuming you've got a sender:

struct Caller {
    task_sender: mpsc::Sender<TaskMessage>,
}
impl Caller {
    async fn get_stats(&self) -> DataAccumulator {
        let (tx, rx) = oneshot::channel();
        // sending isn't even a Future!
        self.task_sender.send(TaskMessage::StatsRequest(tx)).unwrap();
        rx.await.unwrap()
    }
}

The Sender does not even have to be a oneshot. If you need to avoid the overhead of constantly spawning channel pairs, you can make an mpsc pair and send a cloned Sender from that pair:

enum TaskMessage {
    Data(usize),
    QuickStatsRequest(oneshot::Sender<Vec<usize>>),
    PersistentStatsRequest(mpsc::Sender<Vec<usize>>),
}

struct Caller {
    task_sender: mpsc::Sender<TaskMessage>,
    stats_sender: mpsc::Sender<DataAccumulator>,
    stats_receiver: mpsc::Receiver<DataAccumulator>,
}
impl Caller {
    // Need &mut self here since .recv() needs &mut self
    async fn get_stats(&mut self) -> DataAccumulator {
        self.task_sender
            .send(TaskMessage::PersistentStatsRequest(
                self.stats_sender.clone(),
            ))
            .unwrap();
        self.stats_receiver.recv().await.unwrap()
    }
}

Created: 2026-05-20

Tags: async, patterns, programming, rust