cosmic_files/operation/
controller.rs1use atomic_float::AtomicF32;
2use num_enum::{IntoPrimitive, TryFromPrimitive};
3use std::sync::Arc;
4use std::sync::atomic::{self, AtomicU16};
5use tokio::sync::Notify;
6
7#[derive(Clone, Copy, Debug, PartialEq, Eq, IntoPrimitive, TryFromPrimitive)]
8#[repr(u16)]
9pub enum ControllerState {
10 Cancelled,
11 Failed,
12 Paused,
13 Running,
14}
15
16#[derive(Debug)]
17struct ControllerInner {
18 state: AtomicU16,
19 progress: AtomicF32,
20 notify: Notify,
21}
22
23#[derive(Debug)]
24pub struct Controller {
25 primary: bool,
26 inner: Arc<ControllerInner>,
27}
28
29impl Default for Controller {
30 fn default() -> Self {
31 Self {
32 primary: true,
33 inner: Arc::new(ControllerInner {
34 state: AtomicU16::new(ControllerState::Running.into()),
35 progress: AtomicF32::new(0.0),
36 notify: Notify::new(),
37 }),
38 }
39 }
40}
41
42impl Controller {
43 pub async fn check(&self) -> Result<(), ControllerState> {
44 loop {
45 match self.state() {
46 ControllerState::Cancelled => return Err(ControllerState::Cancelled),
47 ControllerState::Failed => return Err(ControllerState::Failed),
48 ControllerState::Paused => (),
49 ControllerState::Running => return Ok(()),
50 }
51
52 self.inner.notify.notified().await;
53 }
54 }
55
56 pub fn progress(&self) -> f32 {
57 self.inner.progress.load(atomic::Ordering::Relaxed)
58 }
59
60 pub fn set_progress(&self, progress: f32) {
61 self.inner
62 .progress
63 .swap(progress, atomic::Ordering::Relaxed);
64 }
65
66 pub fn state(&self) -> ControllerState {
67 ControllerState::try_from(self.inner.state.load(atomic::Ordering::Relaxed))
68 .unwrap_or(ControllerState::Failed)
69 }
70
71 pub fn set_state(&self, state: ControllerState) {
72 self.inner
73 .state
74 .store(state.into(), atomic::Ordering::Relaxed);
75 self.inner.notify.notify_waiters();
76 }
77
78 pub fn is_cancelled(&self) -> bool {
79 matches!(self.state(), ControllerState::Cancelled)
80 }
81
82 pub fn cancel(&self) {
83 self.set_state(ControllerState::Cancelled);
84 }
85
86 pub fn is_failed(&self) -> bool {
87 matches!(self.state(), ControllerState::Failed)
88 }
89
90 pub fn is_paused(&self) -> bool {
91 matches!(self.state(), ControllerState::Paused)
92 }
93
94 pub fn pause(&self) {
95 self.set_state(ControllerState::Paused);
96 }
97
98 pub async fn until_paused(&self) {
102 loop {
103 if matches!(self.state(), ControllerState::Paused) {
104 return;
105 }
106
107 self.inner.notify.notified().await;
108 }
109 }
110
111 pub async fn until_unpaused(&self) {
115 loop {
116 if !matches!(
117 self.state(),
118 ControllerState::Paused | ControllerState::Cancelled | ControllerState::Failed
119 ) {
120 return;
121 }
122
123 self.inner.notify.notified().await;
124 }
125 }
126
127 pub fn unpause(&self) {
128 if !self.is_cancelled() | !self.is_failed() {
129 self.set_state(ControllerState::Running);
130 }
131 }
132}
133
134impl Clone for Controller {
135 fn clone(&self) -> Self {
136 Self {
137 primary: false,
138 inner: self.inner.clone(),
139 }
140 }
141}
142
143impl Drop for Controller {
144 fn drop(&mut self) {
145 if self.primary && self.state() != ControllerState::Failed {
147 self.cancel();
148 }
149 }
150}