cosmic_files/operation/
recursive.rs

1// Copyright 2023 System76 <info@system76.com>
2// SPDX-License-Identifier: GPL-3.0-only
3
4use super::{Controller, OperationSelection, ReplaceResult, copy_unique_path};
5use crate::operation::{OperationError, sync_to_disk};
6use anyhow::Context as AnyhowContext;
7use compio::BufResult;
8use compio::buf::{IntoInner, IoBuf};
9use compio::driver::ToSharedFd;
10use compio::driver::op::AsyncifyFd;
11use compio::io::{AsyncReadAt, AsyncWriteAt};
12use cosmic::iced::futures;
13use std::cell::Cell;
14use std::error::Error;
15use std::fs;
16use std::future::Future;
17use std::ops::ControlFlow;
18use std::path::PathBuf;
19use std::pin::Pin;
20use std::rc::Rc;
21use std::time::Instant;
22use walkdir::WalkDir;
23
24#[cfg(feature = "gvfs")]
25use gio::prelude::FileExtManual;
26
27#[derive(thiserror::Error, Debug)]
28pub enum GioCopyError {
29    #[error("controller state")]
30    Controller(OperationError),
31    #[cfg(feature = "gvfs")]
32    #[error("gio copy failed")]
33    GLib(#[from] glib::Error),
34}
35
36pub enum Method {
37    Copy,
38    Move { cross_device_copy: bool },
39}
40
41pub struct Context {
42    buf: Vec<u8>,
43    controller: Controller,
44    on_progress: Box<dyn OnProgress>,
45    on_replace: Pin<Box<dyn OnReplace>>,
46    pub(crate) op_sel: OperationSelection,
47    replace_result_opt: Option<ReplaceResult>,
48    remaining_conflicts: usize,
49}
50
51pub trait OnProgress: Fn(&Op, &Progress) + 'static {}
52impl<F> OnProgress for F where F: Fn(&Op, &Progress) + 'static {}
53
54pub trait OnReplace:
55    for<'a> Fn(&'a Op, usize) -> Pin<Box<dyn Future<Output = ReplaceResult> + 'a>> + 'static
56{
57}
58impl<F> OnReplace for F where
59    F: for<'a> Fn(&'a Op, usize) -> Pin<Box<dyn Future<Output = ReplaceResult> + 'a>> + 'static
60{
61}
62
63impl Context {
64    pub fn new(controller: Controller) -> Self {
65        Self {
66            // 128K is the optimal upper size of a buffer.
67            buf: vec![0u8; 128 * 1024],
68            controller,
69            on_progress: Box::new(|_op, _progress| {}),
70            on_replace: Box::pin(|_op, _count| Box::pin(async { ReplaceResult::Cancel })),
71            op_sel: OperationSelection::default(),
72            replace_result_opt: None,
73            remaining_conflicts: 0,
74        }
75    }
76
77    pub async fn recursive_copy_or_move(
78        &mut self,
79        from_to_pairs: impl IntoIterator<Item = (PathBuf, PathBuf)>,
80        method: Method,
81    ) -> Result<bool, OperationError> {
82        let mut ops = Vec::new();
83        let mut cleanup_ops = Vec::new();
84        let mut written_files = Vec::new();
85        let mut target_dirs = std::collections::HashSet::new();
86        for (from_parent, to_parent) in from_to_pairs {
87            self.controller
88                .check()
89                .await
90                .map_err(|s| OperationError::from_state(s, &self.controller))?;
91
92            if from_parent == to_parent {
93                // Skip matching source and destination
94                continue;
95            }
96
97            for entry in WalkDir::new(&from_parent) {
98                self.controller
99                    .check()
100                    .await
101                    .map_err(|s| OperationError::from_state(s, &self.controller))?;
102
103                let entry = entry.map_err(|err| {
104                    OperationError::from_err(
105                        format!(
106                            "failed to walk directory {}: {}",
107                            from_parent.display(),
108                            err
109                        ),
110                        &self.controller,
111                    )
112                })?;
113                let file_type = entry.file_type();
114                let from = entry.into_path();
115                let kind = if file_type.is_dir() {
116                    OpKind::Mkdir
117                } else if file_type.is_file() {
118                    match method {
119                        Method::Copy => OpKind::Copy,
120                        Method::Move { cross_device_copy } => OpKind::Move { cross_device_copy },
121                    }
122                } else if file_type.is_symlink() {
123                    let target = fs::read_link(&from).map_err(|err| {
124                        OperationError::from_err(
125                            format!("failed to read link {}: {}", from_parent.display(), err),
126                            &self.controller,
127                        )
128                    })?;
129                    OpKind::Symlink { target }
130                } else {
131                    //TODO: present dialog and allow continue
132                    return Err(OperationError::from_err(
133                        format!("{} is not a known file type", from.display()),
134                        &self.controller,
135                    ));
136                };
137                let to = if from == from_parent {
138                    // When copying a file, from matches from_parent, and to_parent must be used
139                    to_parent.clone()
140                } else {
141                    let relative = from.strip_prefix(&from_parent).map_err(|err| {
142                        OperationError::from_err(
143                            format!(
144                                "failed to remove prefix {} from {}: {}",
145                                from_parent.display(),
146                                from.display(),
147                                err
148                            ),
149                            &self.controller,
150                        )
151                    })?;
152                    //TODO: ensure to is inside of to_parent?
153                    to_parent.join(relative)
154                };
155                let op = Op {
156                    kind,
157                    from,
158                    to,
159                    skipped: Rc::new(Skip {
160                        normal: Cell::new(false),
161                        cleanup: Cell::new(false),
162                    }),
163                    is_cleanup: false,
164                };
165                if matches!(method, Method::Move { .. })
166                    && let Some(cleanup_op) = op.move_cleanup_op()
167                {
168                    cleanup_ops.push(cleanup_op);
169                }
170                if let Some(parent) = op.to.parent() {
171                    target_dirs.insert(parent.to_path_buf());
172                }
173                ops.push(op);
174            }
175
176            self.op_sel.ignored.push(from_parent);
177        }
178
179        // Add cleanup ops after standard ops, in reverse
180        cleanup_ops.reverse();
181        ops.append(&mut cleanup_ops);
182
183        // Count potential conflicts (files that would need replacement)
184        self.remaining_conflicts = ops
185            .iter()
186            .filter(|op| {
187                matches!(
188                    op.kind,
189                    OpKind::Copy | OpKind::Move { .. } | OpKind::Symlink { .. }
190                ) && op.to.is_file()
191            })
192            .count();
193
194        let total_ops = ops.len();
195        for (current_ops, mut op) in ops.into_iter().enumerate() {
196            self.controller
197                .check()
198                .await
199                .map_err(|s| OperationError::from_state(s, &self.controller))?;
200
201            let progress = Progress {
202                current_ops,
203                total_ops,
204                current_bytes: 0,
205                total_bytes: None,
206            };
207            (self.on_progress)(&op, &progress);
208            if op.run(self, progress).await.map_err(|err| {
209                OperationError::from_err(
210                    format!(
211                        "failed to {:?} {} to {}: {}",
212                        op.kind,
213                        op.from.display(),
214                        op.to.display(),
215                        err
216                    ),
217                    &self.controller,
218                )
219            })? {
220                if matches!(
221                    op.kind,
222                    OpKind::Copy
223                        | OpKind::Move {
224                            cross_device_copy: true
225                        }
226                ) {
227                    written_files.push(op.to.clone());
228                }
229                // The from path is ignored in the operation selection if it is a top level item
230                if self.op_sel.ignored.contains(&op.from) {
231                    // So add the to path to the selection
232                    self.op_sel.selected.push(op.to);
233                }
234            } else {
235                // Cancelled
236                return Ok(false);
237            }
238        }
239
240        // Flush files to disk
241        sync_to_disk(written_files, target_dirs).await;
242
243        Ok(true)
244    }
245
246    pub fn on_progress<F: OnProgress>(mut self, f: F) -> Self {
247        self.on_progress = Box::new(f);
248        self
249    }
250
251    pub fn on_replace(mut self, f: impl OnReplace + 'static) -> Self {
252        self.on_replace = Box::pin(f);
253        self
254    }
255
256    async fn replace(&mut self, op: &Op) -> Result<ControlFlow<bool, PathBuf>, Box<dyn Error>> {
257        let replace_result = match self.replace_result_opt {
258            Some(result) => result,
259            None => (self.on_replace)(op, self.remaining_conflicts).await,
260        };
261
262        match replace_result {
263            ReplaceResult::Replace(apply_to_all) => {
264                if apply_to_all {
265                    self.replace_result_opt = Some(replace_result);
266                }
267                compio::fs::remove_file(&op.to).await?;
268                Ok(ControlFlow::Continue(op.to.clone()))
269            }
270            ReplaceResult::KeepBoth => match op.to.parent() {
271                Some(to_parent) => Ok(ControlFlow::Continue(copy_unique_path(&op.from, to_parent))),
272                None => Err(format!("failed to get parent of {}", op.to.display()).into()),
273            },
274            ReplaceResult::Skip(apply_to_all) => {
275                if apply_to_all {
276                    self.replace_result_opt = Some(replace_result);
277                }
278                op.skipped.normal.set(true);
279                Ok(ControlFlow::Break(true))
280            }
281            ReplaceResult::Cancel => Ok(ControlFlow::Break(false)),
282        }
283    }
284}
285
286#[derive(Debug)]
287pub struct Progress {
288    pub current_ops: usize,
289    pub total_ops: usize,
290    pub current_bytes: u64,
291    pub total_bytes: Option<u64>,
292}
293
294#[derive(Debug)]
295pub enum OpKind {
296    Copy,
297    Move { cross_device_copy: bool },
298    Mkdir,
299    Remove,
300    Rmdir,
301    Symlink { target: PathBuf },
302}
303
304#[derive(Debug)]
305pub struct Skip {
306    /// Normal operation should be skipped
307    pub normal: Cell<bool>,
308    /// Cleanup operation should be skipped
309    pub cleanup: Cell<bool>,
310}
311
312#[derive(Debug)]
313pub struct Op {
314    pub kind: OpKind,
315    pub from: PathBuf,
316    pub to: PathBuf,
317    pub skipped: Rc<Skip>,
318    pub is_cleanup: bool,
319}
320
321impl Op {
322    fn move_cleanup_op(&self) -> Option<Self> {
323        let kind = match self.kind {
324            OpKind::Copy | OpKind::Move { .. } | OpKind::Symlink { .. } => OpKind::Remove,
325            OpKind::Mkdir => OpKind::Rmdir,
326            OpKind::Remove | OpKind::Rmdir => return None,
327        };
328        Some(Self {
329            kind,
330            from: self.from.clone(),
331            //TODO: it is strange to have `to` here
332            to: self.to.clone(),
333            skipped: self.skipped.clone(),
334            is_cleanup: true,
335        })
336    }
337
338    async fn run(&mut self, ctx: &mut Context, progress: Progress) -> Result<bool, Box<dyn Error>> {
339        if self.skipped.normal.get() || (self.is_cleanup && self.skipped.cleanup.get()) {
340            return Ok(true);
341        }
342        match self.kind {
343            OpKind::Copy => {
344                crate::operation::actively_writing_add(self.to.clone());
345                let result = self.copy(ctx, progress).await;
346
347                if result.is_err() {
348                    _ = compio::fs::remove_file(&self.to).await;
349                }
350
351                crate::operation::actively_writing_remove(&self.to);
352                return result;
353            }
354            OpKind::Move { cross_device_copy } => {
355                // Do not clean up if cross_device_copy is set
356                if cross_device_copy {
357                    self.skipped.cleanup.set(true);
358                }
359
360                // Remove `to` if overwriting and it is an existing file
361                if self.to.is_file() {
362                    match ctx.replace(self).await? {
363                        ControlFlow::Continue(to) => {
364                            self.to = to;
365                        }
366                        ControlFlow::Break(ret) => {
367                            return Ok(ret);
368                        }
369                    }
370                }
371                // This is atomic and ensures `to` is not created by any other process
372                match compio::fs::hard_link(&self.from, &self.to).await {
373                    Ok(()) => {}
374                    Err(err) => {
375                        // https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_NOT_SAME_DEVICE.html
376                        #[cfg(windows)]
377                        const EXDEV: i32 = 17;
378                        #[cfg(unix)]
379                        const EXDEV: i32 = libc::EXDEV as _;
380
381                        if err.raw_os_error() == Some(EXDEV) {
382                            if cross_device_copy {
383                                // Do not clean up if cross_device_copy is set
384                                self.skipped.cleanup.set(true);
385                            }
386                            // Try standard copy if hard link fails with cross device error
387                            let mut copy_op = Self {
388                                kind: OpKind::Copy,
389                                from: self.from.clone(),
390                                to: self.to.clone(),
391                                skipped: self.skipped.clone(),
392                                is_cleanup: self.is_cleanup,
393                            };
394                            return Box::pin(copy_op.run(ctx, progress)).await;
395                        }
396                        return Err(err.into());
397                    }
398                }
399            }
400            OpKind::Mkdir => {
401                compio::fs::create_dir_all(&self.to).await?;
402            }
403            OpKind::Remove => {
404                compio::fs::remove_file(&self.from).await?;
405            }
406            OpKind::Rmdir => {
407                compio::fs::remove_dir(&self.from).await?;
408            }
409            OpKind::Symlink { ref target } => {
410                // Remove `to` if overwriting and it is an existing file
411                if self.to.is_file() {
412                    match ctx.replace(self).await? {
413                        ControlFlow::Continue(to) => {
414                            self.to = to;
415                        }
416                        ControlFlow::Break(ret) => {
417                            return Ok(ret);
418                        }
419                    }
420                }
421                #[cfg(unix)]
422                {
423                    std::os::unix::fs::symlink(target, &self.to)?;
424                }
425                #[cfg(windows)]
426                {
427                    if target.is_dir() {
428                        std::os::windows::fs::symlink_dir(target, &self.to)?;
429                    } else {
430                        std::os::windows::fs::symlink_file(target, &self.to)?;
431                    }
432                }
433            }
434        }
435        Ok(true)
436    }
437
438    async fn copy(
439        &mut self,
440        ctx: &mut Context,
441        mut progress: Progress,
442    ) -> Result<bool, Box<dyn Error>> {
443        // Remove `to` if overwriting and it is an existing file
444        if self.to.is_file() {
445            match ctx.replace(self).await? {
446                ControlFlow::Continue(to) => {
447                    self.to = to;
448                }
449                ControlFlow::Break(ret) => {
450                    return Ok(ret);
451                }
452            }
453        }
454
455        let (from_file, metadata, to_file) = cosmic::iced::futures::join!(
456            async {
457                compio::fs::OpenOptions::new()
458                    .read(true)
459                    .open(&self.from)
460                    .await
461                    .with_context(|| format!("failed to open {} for reading", self.from.display(),))
462            },
463            async { compio::fs::metadata(&self.from).await.ok() },
464            // This is atomic and ensures `to` is not created by any other process
465            async {
466                compio::fs::OpenOptions::new()
467                    .create_new(true)
468                    .write(true)
469                    .open(&self.to)
470                    .await
471                    .with_context(|| format!("failed to open {} for writing", self.to.display()))
472            }
473        );
474
475        let from_file = from_file?;
476        let mut to_file = to_file?;
477        progress.total_bytes = metadata.as_ref().map(|m| m.len());
478        (ctx.on_progress)(self, &progress);
479
480        if let Some(metadata) = metadata.as_ref()
481            && let Err(why) = to_file.set_permissions(metadata.permissions()).await
482        {
483            // This error is not propagated upwards as some filesystems do not support setting permissions
484            if !matches!(why.kind(), std::io::ErrorKind::Unsupported) {
485                tracing::warn!(?why, "failed to set permissions for {}", self.to.display(),);
486            }
487        }
488
489        // Prevent spamming the progress callbacks.
490        let mut last_progress_update = Instant::now();
491        // io_uring/IOCP requires transferring ownership of the buffer to the kernel.
492        let mut buf_in = std::mem::take(&mut ctx.buf);
493        // Track where the current read/write position is at.
494        let mut pos = 0;
495
496        loop {
497            let BufResult(result, buf_out) = from_file.read_at(buf_in, pos).await;
498
499            let count = match result {
500                Ok(0) => {
501                    buf_in = buf_out;
502                    break;
503                }
504                Ok(count) => count,
505                Err(why) => {
506                    ctx.buf = buf_out;
507                    tracing::error!("failed to read: {:?}", why);
508                    _ = futures::future::join(from_file.close(), to_file.close()).await;
509                    return Err(why).context("failed to read")?;
510                }
511            };
512
513            let BufResult(result, buf_out_slice) =
514                to_file.write_at(buf_out.slice(..count), pos).await;
515            let buf_out = buf_out_slice.into_inner();
516
517            if let Err(why) = result {
518                #[cfg(feature = "gvfs")]
519                if let std::io::ErrorKind::Unsupported = why.kind() {
520                    ctx.buf = buf_out;
521                    _ = futures::future::join(from_file.close(), to_file.close()).await;
522                    return self
523                        .gio_file_copy(ctx, progress)
524                        .await
525                        .map(|_| true)
526                        .map_err(Into::into);
527                }
528
529                tracing::error!("failed to write: {:?}", why);
530                ctx.buf = buf_out;
531                _ = futures::future::join(from_file.close(), to_file.close()).await;
532                return Err(why).context("failed to write")?;
533            }
534
535            progress.current_bytes += count as u64;
536            pos += count as u64;
537
538            // Avoid spamming progress messages too early.
539            let current = Instant::now();
540            if current.duration_since(last_progress_update).as_millis() > 49 {
541                last_progress_update = current;
542                (ctx.on_progress)(self, &progress);
543
544                // Also check if the progress was cancelled.
545                if let Err(state) = ctx.controller.check().await {
546                    ctx.buf = buf_out;
547                    tracing::warn!(
548                        "operation to copy from {:?} to {:?} cancelled",
549                        self.from,
550                        self.to
551                    );
552                    _ = futures::future::join(from_file.close(), to_file.close()).await;
553                    return Err(OperationError::from_state(state, &ctx.controller).into());
554                }
555            }
556
557            buf_in = buf_out;
558        }
559
560        ctx.buf = buf_in;
561
562        if let Some(metadata) = metadata.as_ref() {
563            let mut times = fs::FileTimes::new();
564            if let Ok(time) = metadata.modified() {
565                times = times.set_modified(time);
566            }
567            if let Ok(time) = metadata.accessed() {
568                times = times.set_accessed(time);
569            }
570            //TODO: upstream set_times implementation to compio?
571            let op = AsyncifyFd::new(to_file.to_shared_fd(), move |file: &std::fs::File| {
572                BufResult(file.set_times(times).map(|_| 0), ())
573            });
574            match compio::runtime::submit(op).await.0.map(|_| ()) {
575                Ok(()) => {
576                    tracing::info!("set times for {} to {:?}", self.to.display(), times);
577                }
578                Err(why) => {
579                    if !matches!(why.kind(), std::io::ErrorKind::Unsupported) {
580                        tracing::error!(?why, "failed to set times for {}", self.to.display());
581                    }
582                }
583            }
584        }
585
586        _ = to_file.close().await;
587
588        Ok(true)
589    }
590
591    /// Fallback mechanism in the event that unsupported I/O error errors occur.
592    /// Fixes unsupported errors when copying large files over MTP.
593    /// TODO: Find what Gio.File does to work around this.
594    #[cfg(feature = "gvfs")]
595    async fn gio_file_copy(
596        &self,
597        ctx: &mut Context,
598        mut progress: Progress,
599    ) -> Result<(), GioCopyError> {
600        _ = compio::fs::remove_file(&self.to).await;
601
602        let from = gio::File::for_path(&self.from);
603        let to = gio::File::for_path(&self.to);
604        let (progress_tx, mut progress_rx) = tokio::sync::mpsc::unbounded_channel();
605        let (tx, rx) = tokio::sync::oneshot::channel();
606        let (pause_tx, mut pause_rx) = tokio::sync::watch::channel(false);
607
608        let task = compio::runtime::spawn_blocking(move || {
609            let glib_context = glib::MainContext::new();
610            let glib_loop = glib::MainLoop::new(Some(&glib_context), false);
611            glib_context.with_thread_default(move || {
612                let glib_loop2 = glib_loop.clone();
613                glib::MainContext::ref_thread_default().spawn_local(async move {
614                    // Create a future for copying the file with `gio::File`. This also creates a progress stream.
615                    let (gio_copy_fut, mut progress_stream) = from.copy_future(
616                        &to,
617                        gio::FileCopyFlags::OVERWRITE | gio::FileCopyFlags::ALL_METADATA,
618                        glib::Priority::LOW,
619                    );
620
621                    let mut copy_fut = gio_copy_fut
622                        .map(|result| result.map_err(GioCopyError::GLib))
623                        .fuse();
624
625                    let progress_fut = std::pin::pin!(async {
626                        while let Some((current_bytes, _)) = progress_stream.next().await {
627                            _ = progress_tx.send(current_bytes);
628                        }
629
630                        drop(progress_tx);
631                        futures::future::pending::<()>().await;
632                    });
633
634                    let mut progress_fut = progress_fut.fuse();
635                    let mut pause_rx2 = pause_rx.clone();
636
637                    loop {
638                        let until_paused = std::pin::pin!(pause_rx.wait_for(|paused| *paused));
639                        futures::select! {
640                            _ = &mut progress_fut => {},
641
642                            result = &mut copy_fut => {
643                                _ = tx.send(result.map(|_| ()));
644                                glib_loop2.quit();
645                                return;
646                            }
647
648                            _ = until_paused.fuse() => {
649                                _ = pause_rx2.wait_for(|paused| !*paused).await;
650                            }
651                        }
652                    }
653                });
654
655                glib_loop.run();
656            })
657        });
658
659        let mut last_progress_update = Instant::now();
660        let mut task = task.fuse();
661        let mut rx = rx.fuse();
662
663        loop {
664            let until_paused = std::pin::pin!(ctx.controller.until_paused());
665            futures::select! {
666                value = progress_rx.recv().fuse() => {
667                    if let Some(current_bytes) = value {
668                        progress.current_bytes = current_bytes as u64;
669                        let current = Instant::now();
670                        if current.duration_since(last_progress_update).as_millis() > 49 {
671                            last_progress_update = current;
672                            (ctx.on_progress)(self, &progress);
673                            // Also check if the progress was cancelled.
674                            if let Err(state) = ctx.controller.check().await {
675                                tracing::warn!(
676                                    "operation to copy from {:?} to {:?} cancelled",
677                                    self.from,
678                                    self.to
679                                );
680                                return Err::<(), GioCopyError>(GioCopyError::Controller(
681                                    OperationError::from_state(state, &ctx.controller),
682                                ));
683                            }
684                        }
685                    }
686                }
687
688                result = rx => return result.unwrap(),
689
690                _ = task => (),
691
692                _ = until_paused.fuse() => {
693                    // Pauses an active copy while the controller state is paused.
694                    _ = pause_tx.send(true);
695                    ctx.controller.until_unpaused().await;
696                    _ = pause_tx.send(false);
697                }
698            }
699        }
700    }
701}