cosmic_files/operation/
controller.rs

1use atomic_float::AtomicF32;
2use num_enum::{IntoPrimitive, TryFromPrimitive};
3use std::sync::Arc;
4use std::sync::atomic::{self, AtomicU16};
5use tokio::sync::Notify;
6
7#[derive(Clone, Copy, Debug, PartialEq, Eq, IntoPrimitive, TryFromPrimitive)]
8#[repr(u16)]
9pub enum ControllerState {
10    Cancelled,
11    Failed,
12    Paused,
13    Running,
14}
15
16#[derive(Debug)]
17struct ControllerInner {
18    state: AtomicU16,
19    progress: AtomicF32,
20    notify: Notify,
21}
22
23#[derive(Debug)]
24pub struct Controller {
25    primary: bool,
26    inner: Arc<ControllerInner>,
27}
28
29impl Default for Controller {
30    fn default() -> Self {
31        Self {
32            primary: true,
33            inner: Arc::new(ControllerInner {
34                state: AtomicU16::new(ControllerState::Running.into()),
35                progress: AtomicF32::new(0.0),
36                notify: Notify::new(),
37            }),
38        }
39    }
40}
41
42impl Controller {
43    pub async fn check(&self) -> Result<(), ControllerState> {
44        loop {
45            match self.state() {
46                ControllerState::Cancelled => return Err(ControllerState::Cancelled),
47                ControllerState::Failed => return Err(ControllerState::Failed),
48                ControllerState::Paused => (),
49                ControllerState::Running => return Ok(()),
50            }
51
52            self.inner.notify.notified().await;
53        }
54    }
55
56    pub fn progress(&self) -> f32 {
57        self.inner.progress.load(atomic::Ordering::Relaxed)
58    }
59
60    pub fn set_progress(&self, progress: f32) {
61        self.inner
62            .progress
63            .swap(progress, atomic::Ordering::Relaxed);
64    }
65
66    pub fn state(&self) -> ControllerState {
67        ControllerState::try_from(self.inner.state.load(atomic::Ordering::Relaxed))
68            .unwrap_or(ControllerState::Failed)
69    }
70
71    pub fn set_state(&self, state: ControllerState) {
72        self.inner
73            .state
74            .store(state.into(), atomic::Ordering::Relaxed);
75        self.inner.notify.notify_waiters();
76    }
77
78    pub fn is_cancelled(&self) -> bool {
79        matches!(self.state(), ControllerState::Cancelled)
80    }
81
82    pub fn cancel(&self) {
83        self.set_state(ControllerState::Cancelled);
84    }
85
86    pub fn is_failed(&self) -> bool {
87        matches!(self.state(), ControllerState::Failed)
88    }
89
90    pub fn is_paused(&self) -> bool {
91        matches!(self.state(), ControllerState::Paused)
92    }
93
94    pub fn pause(&self) {
95        self.set_state(ControllerState::Paused);
96    }
97
98    /// Returns when the state is paused.
99    ///
100    /// Use this to pause futures.
101    pub async fn until_paused(&self) {
102        loop {
103            if matches!(self.state(), ControllerState::Paused) {
104                return;
105            }
106
107            self.inner.notify.notified().await;
108        }
109    }
110
111    /// Returns when state is neither paused, cancelled, nor failed.
112    ///
113    /// Use this to resume futures.
114    pub async fn until_unpaused(&self) {
115        loop {
116            if !matches!(
117                self.state(),
118                ControllerState::Paused | ControllerState::Cancelled | ControllerState::Failed
119            ) {
120                return;
121            }
122
123            self.inner.notify.notified().await;
124        }
125    }
126
127    pub fn unpause(&self) {
128        if !self.is_cancelled() | !self.is_failed() {
129            self.set_state(ControllerState::Running);
130        }
131    }
132}
133
134impl Clone for Controller {
135    fn clone(&self) -> Self {
136        Self {
137            primary: false,
138            inner: self.inner.clone(),
139        }
140    }
141}
142
143impl Drop for Controller {
144    fn drop(&mut self) {
145        // Cancel operations if primary controller is dropped and controller is still running
146        if self.primary && self.state() != ControllerState::Failed {
147            self.cancel();
148        }
149    }
150}