1use std::collections::VecDeque;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, Mutex};
7
8pub fn channel<Message>() -> (Sender<Message>, Receiver<Message>) {
10 let channel = Arc::new(Channel {
11 queue: Mutex::new(VecDeque::default()),
12 notify: tokio::sync::Notify::const_new(),
13 closed: AtomicBool::new(false),
14 });
15
16 (Sender(channel.clone()), Receiver(channel))
17}
18
19struct Channel<Message> {
21 pub(self) queue: Mutex<VecDeque<Message>>,
22 pub(self) notify: tokio::sync::Notify,
24 pub(self) closed: AtomicBool,
26}
27
28pub struct Sender<Message>(Arc<Channel<Message>>);
29
30impl<Message> Sender<Message> {
31 pub fn send(&self, message: Message) {
32 self.0.queue.lock().unwrap().push_back(message);
33 self.0.notify.notify_one();
34 }
35}
36
37impl<Message> Drop for Sender<Message> {
38 fn drop(&mut self) {
39 self.0.closed.store(true, Ordering::SeqCst);
40 self.0.notify.notify_one();
41 }
42}
43
44pub struct Receiver<Message>(Arc<Channel<Message>>);
45
46impl<Message> Receiver<Message> {
47 pub async fn recv(&self) -> Option<Message> {
49 loop {
50 {
51 let mut queue = self.0.queue.lock().unwrap();
52 if let Some(value) = queue.pop_front() {
53 if queue.capacity() - queue.len() > 32 {
54 let capacity = queue.len().next_power_of_two();
55 queue.shrink_to(capacity);
56 }
57 drop(queue);
58 return Some(value);
59 }
60 }
61
62 if self.0.closed.load(Ordering::SeqCst) {
63 return None;
64 }
65
66 self.0.notify.notified().await;
67 }
68 }
69
70 pub fn try_recv(&self) -> Option<Message> {
71 self.0.queue.lock().unwrap().pop_front()
72 }
73}