cosmic_files/
channel.rs

1// Copyright 2025 System76 <info@system76.com>
2// SPDX-License-Identifier: MPL-2.0
3
4use std::collections::VecDeque;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, Mutex};
7
8/// Create a channel backed by `tokio::sync::Notify` and a sync mutex with a vec deque.
9pub fn channel<Message>() -> (Sender<Message>, Receiver<Message>) {
10    let channel = Arc::new(Channel {
11        queue: Mutex::new(VecDeque::default()),
12        notify: tokio::sync::Notify::const_new(),
13        closed: AtomicBool::new(false),
14    });
15
16    (Sender(channel.clone()), Receiver(channel))
17}
18
19/// A channel backed by `tokio::sync::Notify` and a sync mutex with a vec deque.
20struct Channel<Message> {
21    pub(self) queue: Mutex<VecDeque<Message>>,
22    /// Set when a new message has been stored.
23    pub(self) notify: tokio::sync::Notify,
24    /// Set when the receiver is dropped.
25    pub(self) closed: AtomicBool,
26}
27
28pub struct Sender<Message>(Arc<Channel<Message>>);
29
30impl<Message> Sender<Message> {
31    pub fn send(&self, message: Message) {
32        self.0.queue.lock().unwrap().push_back(message);
33        self.0.notify.notify_one();
34    }
35}
36
37impl<Message> Drop for Sender<Message> {
38    fn drop(&mut self) {
39        self.0.closed.store(true, Ordering::SeqCst);
40        self.0.notify.notify_one();
41    }
42}
43
44pub struct Receiver<Message>(Arc<Channel<Message>>);
45
46impl<Message> Receiver<Message> {
47    /// Returns a value until the sender is dropped.
48    pub async fn recv(&self) -> Option<Message> {
49        loop {
50            {
51                let mut queue = self.0.queue.lock().unwrap();
52                if let Some(value) = queue.pop_front() {
53                    if queue.capacity() - queue.len() > 32 {
54                        let capacity = queue.len().next_power_of_two();
55                        queue.shrink_to(capacity);
56                    }
57                    drop(queue);
58                    return Some(value);
59                }
60            }
61
62            if self.0.closed.load(Ordering::SeqCst) {
63                return None;
64            }
65
66            self.0.notify.notified().await;
67        }
68    }
69
70    pub fn try_recv(&self) -> Option<Message> {
71        self.0.queue.lock().unwrap().pop_front()
72    }
73}