1use super::{Controller, OperationSelection, ReplaceResult, copy_unique_path};
5use crate::operation::{OperationError, sync_to_disk};
6use anyhow::Context as AnyhowContext;
7use compio::BufResult;
8use compio::buf::{IntoInner, IoBuf};
9use compio::driver::ToSharedFd;
10use compio::driver::op::AsyncifyFd;
11use compio::io::{AsyncReadAt, AsyncWriteAt};
12use cosmic::iced::futures;
13use std::cell::Cell;
14use std::error::Error;
15use std::fs;
16use std::future::Future;
17use std::ops::ControlFlow;
18use std::path::PathBuf;
19use std::pin::Pin;
20use std::rc::Rc;
21use std::time::Instant;
22use walkdir::WalkDir;
23
24#[cfg(feature = "gvfs")]
25use gio::prelude::FileExtManual;
26
27#[derive(thiserror::Error, Debug)]
28pub enum GioCopyError {
29 #[error("controller state")]
30 Controller(OperationError),
31 #[cfg(feature = "gvfs")]
32 #[error("gio copy failed")]
33 GLib(#[from] glib::Error),
34}
35
36pub enum Method {
37 Copy,
38 Move { cross_device_copy: bool },
39}
40
41pub struct Context {
42 buf: Vec<u8>,
43 controller: Controller,
44 on_progress: Box<dyn OnProgress>,
45 on_replace: Pin<Box<dyn OnReplace>>,
46 pub(crate) op_sel: OperationSelection,
47 replace_result_opt: Option<ReplaceResult>,
48 remaining_conflicts: usize,
49}
50
51pub trait OnProgress: Fn(&Op, &Progress) + 'static {}
52impl<F> OnProgress for F where F: Fn(&Op, &Progress) + 'static {}
53
54pub trait OnReplace:
55 for<'a> Fn(&'a Op, usize) -> Pin<Box<dyn Future<Output = ReplaceResult> + 'a>> + 'static
56{
57}
58impl<F> OnReplace for F where
59 F: for<'a> Fn(&'a Op, usize) -> Pin<Box<dyn Future<Output = ReplaceResult> + 'a>> + 'static
60{
61}
62
63impl Context {
64 pub fn new(controller: Controller) -> Self {
65 Self {
66 buf: vec![0u8; 128 * 1024],
68 controller,
69 on_progress: Box::new(|_op, _progress| {}),
70 on_replace: Box::pin(|_op, _count| Box::pin(async { ReplaceResult::Cancel })),
71 op_sel: OperationSelection::default(),
72 replace_result_opt: None,
73 remaining_conflicts: 0,
74 }
75 }
76
77 pub async fn recursive_copy_or_move(
78 &mut self,
79 from_to_pairs: impl IntoIterator<Item = (PathBuf, PathBuf)>,
80 method: Method,
81 ) -> Result<bool, OperationError> {
82 let mut ops = Vec::new();
83 let mut cleanup_ops = Vec::new();
84 let mut written_files = Vec::new();
85 let mut target_dirs = std::collections::HashSet::new();
86 for (from_parent, to_parent) in from_to_pairs {
87 self.controller
88 .check()
89 .await
90 .map_err(|s| OperationError::from_state(s, &self.controller))?;
91
92 if from_parent == to_parent {
93 continue;
95 }
96
97 for entry in WalkDir::new(&from_parent) {
98 self.controller
99 .check()
100 .await
101 .map_err(|s| OperationError::from_state(s, &self.controller))?;
102
103 let entry = entry.map_err(|err| {
104 OperationError::from_err(
105 format!(
106 "failed to walk directory {}: {}",
107 from_parent.display(),
108 err
109 ),
110 &self.controller,
111 )
112 })?;
113 let file_type = entry.file_type();
114 let from = entry.into_path();
115 let kind = if file_type.is_dir() {
116 OpKind::Mkdir
117 } else if file_type.is_file() {
118 match method {
119 Method::Copy => OpKind::Copy,
120 Method::Move { cross_device_copy } => OpKind::Move { cross_device_copy },
121 }
122 } else if file_type.is_symlink() {
123 let target = fs::read_link(&from).map_err(|err| {
124 OperationError::from_err(
125 format!("failed to read link {}: {}", from_parent.display(), err),
126 &self.controller,
127 )
128 })?;
129 OpKind::Symlink { target }
130 } else {
131 return Err(OperationError::from_err(
133 format!("{} is not a known file type", from.display()),
134 &self.controller,
135 ));
136 };
137 let to = if from == from_parent {
138 to_parent.clone()
140 } else {
141 let relative = from.strip_prefix(&from_parent).map_err(|err| {
142 OperationError::from_err(
143 format!(
144 "failed to remove prefix {} from {}: {}",
145 from_parent.display(),
146 from.display(),
147 err
148 ),
149 &self.controller,
150 )
151 })?;
152 to_parent.join(relative)
154 };
155 let op = Op {
156 kind,
157 from,
158 to,
159 skipped: Rc::new(Skip {
160 normal: Cell::new(false),
161 cleanup: Cell::new(false),
162 }),
163 is_cleanup: false,
164 };
165 if matches!(method, Method::Move { .. })
166 && let Some(cleanup_op) = op.move_cleanup_op()
167 {
168 cleanup_ops.push(cleanup_op);
169 }
170 if let Some(parent) = op.to.parent() {
171 target_dirs.insert(parent.to_path_buf());
172 }
173 ops.push(op);
174 }
175
176 self.op_sel.ignored.push(from_parent);
177 }
178
179 cleanup_ops.reverse();
181 ops.append(&mut cleanup_ops);
182
183 self.remaining_conflicts = ops
185 .iter()
186 .filter(|op| {
187 matches!(
188 op.kind,
189 OpKind::Copy | OpKind::Move { .. } | OpKind::Symlink { .. }
190 ) && op.to.is_file()
191 })
192 .count();
193
194 let total_ops = ops.len();
195 for (current_ops, mut op) in ops.into_iter().enumerate() {
196 self.controller
197 .check()
198 .await
199 .map_err(|s| OperationError::from_state(s, &self.controller))?;
200
201 let progress = Progress {
202 current_ops,
203 total_ops,
204 current_bytes: 0,
205 total_bytes: None,
206 };
207 (self.on_progress)(&op, &progress);
208 if op.run(self, progress).await.map_err(|err| {
209 OperationError::from_err(
210 format!(
211 "failed to {:?} {} to {}: {}",
212 op.kind,
213 op.from.display(),
214 op.to.display(),
215 err
216 ),
217 &self.controller,
218 )
219 })? {
220 if matches!(
221 op.kind,
222 OpKind::Copy
223 | OpKind::Move {
224 cross_device_copy: true
225 }
226 ) {
227 written_files.push(op.to.clone());
228 }
229 if self.op_sel.ignored.contains(&op.from) {
231 self.op_sel.selected.push(op.to);
233 }
234 } else {
235 return Ok(false);
237 }
238 }
239
240 sync_to_disk(written_files, target_dirs).await;
242
243 Ok(true)
244 }
245
246 pub fn on_progress<F: OnProgress>(mut self, f: F) -> Self {
247 self.on_progress = Box::new(f);
248 self
249 }
250
251 pub fn on_replace(mut self, f: impl OnReplace + 'static) -> Self {
252 self.on_replace = Box::pin(f);
253 self
254 }
255
256 async fn replace(&mut self, op: &Op) -> Result<ControlFlow<bool, PathBuf>, Box<dyn Error>> {
257 let replace_result = match self.replace_result_opt {
258 Some(result) => result,
259 None => (self.on_replace)(op, self.remaining_conflicts).await,
260 };
261
262 match replace_result {
263 ReplaceResult::Replace(apply_to_all) => {
264 if apply_to_all {
265 self.replace_result_opt = Some(replace_result);
266 }
267 compio::fs::remove_file(&op.to).await?;
268 Ok(ControlFlow::Continue(op.to.clone()))
269 }
270 ReplaceResult::KeepBoth => match op.to.parent() {
271 Some(to_parent) => Ok(ControlFlow::Continue(copy_unique_path(&op.from, to_parent))),
272 None => Err(format!("failed to get parent of {}", op.to.display()).into()),
273 },
274 ReplaceResult::Skip(apply_to_all) => {
275 if apply_to_all {
276 self.replace_result_opt = Some(replace_result);
277 }
278 op.skipped.normal.set(true);
279 Ok(ControlFlow::Break(true))
280 }
281 ReplaceResult::Cancel => Ok(ControlFlow::Break(false)),
282 }
283 }
284}
285
286#[derive(Debug)]
287pub struct Progress {
288 pub current_ops: usize,
289 pub total_ops: usize,
290 pub current_bytes: u64,
291 pub total_bytes: Option<u64>,
292}
293
294#[derive(Debug)]
295pub enum OpKind {
296 Copy,
297 Move { cross_device_copy: bool },
298 Mkdir,
299 Remove,
300 Rmdir,
301 Symlink { target: PathBuf },
302}
303
304#[derive(Debug)]
305pub struct Skip {
306 pub normal: Cell<bool>,
308 pub cleanup: Cell<bool>,
310}
311
312#[derive(Debug)]
313pub struct Op {
314 pub kind: OpKind,
315 pub from: PathBuf,
316 pub to: PathBuf,
317 pub skipped: Rc<Skip>,
318 pub is_cleanup: bool,
319}
320
321impl Op {
322 fn move_cleanup_op(&self) -> Option<Self> {
323 let kind = match self.kind {
324 OpKind::Copy | OpKind::Move { .. } | OpKind::Symlink { .. } => OpKind::Remove,
325 OpKind::Mkdir => OpKind::Rmdir,
326 OpKind::Remove | OpKind::Rmdir => return None,
327 };
328 Some(Self {
329 kind,
330 from: self.from.clone(),
331 to: self.to.clone(),
333 skipped: self.skipped.clone(),
334 is_cleanup: true,
335 })
336 }
337
338 async fn run(&mut self, ctx: &mut Context, progress: Progress) -> Result<bool, Box<dyn Error>> {
339 if self.skipped.normal.get() || (self.is_cleanup && self.skipped.cleanup.get()) {
340 return Ok(true);
341 }
342 match self.kind {
343 OpKind::Copy => {
344 crate::operation::actively_writing_add(self.to.clone());
345 let result = self.copy(ctx, progress).await;
346
347 if result.is_err() {
348 _ = compio::fs::remove_file(&self.to).await;
349 }
350
351 crate::operation::actively_writing_remove(&self.to);
352 return result;
353 }
354 OpKind::Move { cross_device_copy } => {
355 if cross_device_copy {
357 self.skipped.cleanup.set(true);
358 }
359
360 if self.to.is_file() {
362 match ctx.replace(self).await? {
363 ControlFlow::Continue(to) => {
364 self.to = to;
365 }
366 ControlFlow::Break(ret) => {
367 return Ok(ret);
368 }
369 }
370 }
371 match compio::fs::hard_link(&self.from, &self.to).await {
373 Ok(()) => {}
374 Err(err) => {
375 #[cfg(windows)]
377 const EXDEV: i32 = 17;
378 #[cfg(unix)]
379 const EXDEV: i32 = libc::EXDEV as _;
380
381 if err.raw_os_error() == Some(EXDEV) {
382 if cross_device_copy {
383 self.skipped.cleanup.set(true);
385 }
386 let mut copy_op = Self {
388 kind: OpKind::Copy,
389 from: self.from.clone(),
390 to: self.to.clone(),
391 skipped: self.skipped.clone(),
392 is_cleanup: self.is_cleanup,
393 };
394 return Box::pin(copy_op.run(ctx, progress)).await;
395 }
396 return Err(err.into());
397 }
398 }
399 }
400 OpKind::Mkdir => {
401 compio::fs::create_dir_all(&self.to).await?;
402 }
403 OpKind::Remove => {
404 compio::fs::remove_file(&self.from).await?;
405 }
406 OpKind::Rmdir => {
407 compio::fs::remove_dir(&self.from).await?;
408 }
409 OpKind::Symlink { ref target } => {
410 if self.to.is_file() {
412 match ctx.replace(self).await? {
413 ControlFlow::Continue(to) => {
414 self.to = to;
415 }
416 ControlFlow::Break(ret) => {
417 return Ok(ret);
418 }
419 }
420 }
421 #[cfg(unix)]
422 {
423 std::os::unix::fs::symlink(target, &self.to)?;
424 }
425 #[cfg(windows)]
426 {
427 if target.is_dir() {
428 std::os::windows::fs::symlink_dir(target, &self.to)?;
429 } else {
430 std::os::windows::fs::symlink_file(target, &self.to)?;
431 }
432 }
433 }
434 }
435 Ok(true)
436 }
437
438 async fn copy(
439 &mut self,
440 ctx: &mut Context,
441 mut progress: Progress,
442 ) -> Result<bool, Box<dyn Error>> {
443 if self.to.is_file() {
445 match ctx.replace(self).await? {
446 ControlFlow::Continue(to) => {
447 self.to = to;
448 }
449 ControlFlow::Break(ret) => {
450 return Ok(ret);
451 }
452 }
453 }
454
455 let (from_file, metadata, to_file) = cosmic::iced::futures::join!(
456 async {
457 compio::fs::OpenOptions::new()
458 .read(true)
459 .open(&self.from)
460 .await
461 .with_context(|| format!("failed to open {} for reading", self.from.display(),))
462 },
463 async { compio::fs::metadata(&self.from).await.ok() },
464 async {
466 compio::fs::OpenOptions::new()
467 .create_new(true)
468 .write(true)
469 .open(&self.to)
470 .await
471 .with_context(|| format!("failed to open {} for writing", self.to.display()))
472 }
473 );
474
475 let from_file = from_file?;
476 let mut to_file = to_file?;
477 progress.total_bytes = metadata.as_ref().map(|m| m.len());
478 (ctx.on_progress)(self, &progress);
479
480 if let Some(metadata) = metadata.as_ref()
481 && let Err(why) = to_file.set_permissions(metadata.permissions()).await
482 {
483 if !matches!(why.kind(), std::io::ErrorKind::Unsupported) {
485 tracing::warn!(?why, "failed to set permissions for {}", self.to.display(),);
486 }
487 }
488
489 let mut last_progress_update = Instant::now();
491 let mut buf_in = std::mem::take(&mut ctx.buf);
493 let mut pos = 0;
495
496 loop {
497 let BufResult(result, buf_out) = from_file.read_at(buf_in, pos).await;
498
499 let count = match result {
500 Ok(0) => {
501 buf_in = buf_out;
502 break;
503 }
504 Ok(count) => count,
505 Err(why) => {
506 ctx.buf = buf_out;
507 tracing::error!("failed to read: {:?}", why);
508 _ = futures::future::join(from_file.close(), to_file.close()).await;
509 return Err(why).context("failed to read")?;
510 }
511 };
512
513 let BufResult(result, buf_out_slice) =
514 to_file.write_at(buf_out.slice(..count), pos).await;
515 let buf_out = buf_out_slice.into_inner();
516
517 if let Err(why) = result {
518 #[cfg(feature = "gvfs")]
519 if let std::io::ErrorKind::Unsupported = why.kind() {
520 ctx.buf = buf_out;
521 _ = futures::future::join(from_file.close(), to_file.close()).await;
522 return self
523 .gio_file_copy(ctx, progress)
524 .await
525 .map(|_| true)
526 .map_err(Into::into);
527 }
528
529 tracing::error!("failed to write: {:?}", why);
530 ctx.buf = buf_out;
531 _ = futures::future::join(from_file.close(), to_file.close()).await;
532 return Err(why).context("failed to write")?;
533 }
534
535 progress.current_bytes += count as u64;
536 pos += count as u64;
537
538 let current = Instant::now();
540 if current.duration_since(last_progress_update).as_millis() > 49 {
541 last_progress_update = current;
542 (ctx.on_progress)(self, &progress);
543
544 if let Err(state) = ctx.controller.check().await {
546 ctx.buf = buf_out;
547 tracing::warn!(
548 "operation to copy from {:?} to {:?} cancelled",
549 self.from,
550 self.to
551 );
552 _ = futures::future::join(from_file.close(), to_file.close()).await;
553 return Err(OperationError::from_state(state, &ctx.controller).into());
554 }
555 }
556
557 buf_in = buf_out;
558 }
559
560 ctx.buf = buf_in;
561
562 if let Some(metadata) = metadata.as_ref() {
563 let mut times = fs::FileTimes::new();
564 if let Ok(time) = metadata.modified() {
565 times = times.set_modified(time);
566 }
567 if let Ok(time) = metadata.accessed() {
568 times = times.set_accessed(time);
569 }
570 let op = AsyncifyFd::new(to_file.to_shared_fd(), move |file: &std::fs::File| {
572 BufResult(file.set_times(times).map(|_| 0), ())
573 });
574 match compio::runtime::submit(op).await.0.map(|_| ()) {
575 Ok(()) => {
576 tracing::info!("set times for {} to {:?}", self.to.display(), times);
577 }
578 Err(why) => {
579 if !matches!(why.kind(), std::io::ErrorKind::Unsupported) {
580 tracing::error!(?why, "failed to set times for {}", self.to.display());
581 }
582 }
583 }
584 }
585
586 _ = to_file.close().await;
587
588 Ok(true)
589 }
590
591 #[cfg(feature = "gvfs")]
595 async fn gio_file_copy(
596 &self,
597 ctx: &mut Context,
598 mut progress: Progress,
599 ) -> Result<(), GioCopyError> {
600 _ = compio::fs::remove_file(&self.to).await;
601
602 let from = gio::File::for_path(&self.from);
603 let to = gio::File::for_path(&self.to);
604 let (progress_tx, mut progress_rx) = tokio::sync::mpsc::unbounded_channel();
605 let (tx, rx) = tokio::sync::oneshot::channel();
606 let (pause_tx, mut pause_rx) = tokio::sync::watch::channel(false);
607
608 let task = compio::runtime::spawn_blocking(move || {
609 let glib_context = glib::MainContext::new();
610 let glib_loop = glib::MainLoop::new(Some(&glib_context), false);
611 glib_context.with_thread_default(move || {
612 let glib_loop2 = glib_loop.clone();
613 glib::MainContext::ref_thread_default().spawn_local(async move {
614 let (gio_copy_fut, mut progress_stream) = from.copy_future(
616 &to,
617 gio::FileCopyFlags::OVERWRITE | gio::FileCopyFlags::ALL_METADATA,
618 glib::Priority::LOW,
619 );
620
621 let mut copy_fut = gio_copy_fut
622 .map(|result| result.map_err(GioCopyError::GLib))
623 .fuse();
624
625 let progress_fut = std::pin::pin!(async {
626 while let Some((current_bytes, _)) = progress_stream.next().await {
627 _ = progress_tx.send(current_bytes);
628 }
629
630 drop(progress_tx);
631 futures::future::pending::<()>().await;
632 });
633
634 let mut progress_fut = progress_fut.fuse();
635 let mut pause_rx2 = pause_rx.clone();
636
637 loop {
638 let until_paused = std::pin::pin!(pause_rx.wait_for(|paused| *paused));
639 futures::select! {
640 _ = &mut progress_fut => {},
641
642 result = &mut copy_fut => {
643 _ = tx.send(result.map(|_| ()));
644 glib_loop2.quit();
645 return;
646 }
647
648 _ = until_paused.fuse() => {
649 _ = pause_rx2.wait_for(|paused| !*paused).await;
650 }
651 }
652 }
653 });
654
655 glib_loop.run();
656 })
657 });
658
659 let mut last_progress_update = Instant::now();
660 let mut task = task.fuse();
661 let mut rx = rx.fuse();
662
663 loop {
664 let until_paused = std::pin::pin!(ctx.controller.until_paused());
665 futures::select! {
666 value = progress_rx.recv().fuse() => {
667 if let Some(current_bytes) = value {
668 progress.current_bytes = current_bytes as u64;
669 let current = Instant::now();
670 if current.duration_since(last_progress_update).as_millis() > 49 {
671 last_progress_update = current;
672 (ctx.on_progress)(self, &progress);
673 if let Err(state) = ctx.controller.check().await {
675 tracing::warn!(
676 "operation to copy from {:?} to {:?} cancelled",
677 self.from,
678 self.to
679 );
680 return Err::<(), GioCopyError>(GioCopyError::Controller(
681 OperationError::from_state(state, &ctx.controller),
682 ));
683 }
684 }
685 }
686 }
687
688 result = rx => return result.unwrap(),
689
690 _ = task => (),
691
692 _ = until_paused.fuse() => {
693 _ = pause_tx.send(true);
695 ctx.controller.until_unpaused().await;
696 _ = pause_tx.send(false);
697 }
698 }
699 }
700 }
701}