cosmic_files/russh/
client.rs

1use async_ssh2_tokio::client::{AuthMethod, Client, ServerCheckMethod};
2use russh_sftp::client::SftpSession;
3use russh_sftp::protocol::FileType;
4use url::Url;
5
6use cosmic::{
7    Task,
8    iced::{Subscription, futures::SinkExt, stream},
9    widget,
10};
11use std::{
12    any::TypeId,
13    cell::Cell,
14    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
15    fmt,
16    future::pending,
17    hash::Hash,
18    path::{Path, PathBuf},
19    str::FromStr,
20    sync::Arc,
21};
22use tokio::sync::{Mutex, RwLock, mpsc};
23
24use super::{ClientAuth, ClientItem, ClientItems, ClientMessage, Connector};
25
26use crate::{
27    config::IconSizes,
28    fl,
29    sequencing::tb_data::TbProfilerJson,
30    tab::{self, DirSize, ItemMetadata, ItemThumbnail, Location},
31};
32use mime_guess::MimeGuess;
33use tokio::io::AsyncReadExt;
34use tokio::runtime::Builder;
35
36use crate::config::TBConfig;
37
38/// Collects the relevant output files produced by a single TB-Profiler sample run.
39///
40/// Each field holds the path to the corresponding file type on the remote host.
41/// Fields are `None` when the file was not found for that sample.
42#[derive(Debug, Clone, PartialEq, Eq, Hash)]
43struct SampleFiles {
44    /// Path to the TBProfiler JSON report (`<sample>.results.json`).
45    json: Option<PathBuf>,
46    /// Path to the CSV summary file (`<sample>.results.csv`), if present.
47    csv: Option<PathBuf>,
48    /// Path to the DOCX report (`<sample>.results.docx`), if present.
49    docx: Option<PathBuf>,
50    mtime: u64,
51    size: Option<u64>,
52}
53
54use super::SlurmJobId;
55
56fn get_key_files() -> Result<(PathBuf, PathBuf), String> {
57    let home_dir = dirs::home_dir().ok_or_else(|| {
58        "Could not determine the user home directory.\n\
59        This usually indicates that the HOME environment variable is missing or invalid.\n\
60        Please ensure HOME is set correctly before using SSH connections."
61            .to_string()
62    })?;
63
64    let ssh_dir = home_dir.join(".ssh");
65    if !ssh_dir.is_dir() {
66        return Err(format!(
67            "SSH configuration directory not found: {}.\n\
68            Please ensure ~/.ssh exists and contains your SSH keys.",
69            ssh_dir.display()
70        ));
71    }
72    if ssh_dir.join("id_rsa").is_file() {
73        Ok((ssh_dir.join("id_rsa"), ssh_dir.join("id_rsa.pub")))
74    } else if ssh_dir.join("id_ed25519").is_file() {
75        Ok((ssh_dir.join("id_ed25519"), ssh_dir.join("id_ed25519.pub")))
76    } else if ssh_dir.join("id_ecdsa").is_file() {
77        Ok((ssh_dir.join("id_ecdsa"), ssh_dir.join("id_ecdsa.pub")))
78    } else if ssh_dir.join("id_dsa").is_file() {
79        Ok((ssh_dir.join("id_dsa"), ssh_dir.join("id_dsa.pub")))
80    } else {
81        Err(format!(
82            "No SSH key pair found in {}.\n\
83            Expected one of: id_rsa, id_ed25519, id_ecdsa, id_dsa.\n\
84            Please generate a key (e.g., with `ssh-keygen -t ed25519`) and try again.",
85            ssh_dir.display()
86        ))
87    }
88}
89
90pub async fn dir_info(
91    client: &Client,
92    uri: &str,
93) -> Result<(String, String, Option<PathBuf>), String> {
94    let remote_file = uri.parse::<RemoteFile>().map_err(|e| e.to_string())?;
95    let resolved_uri = remote_file.uri();
96
97    let channel = client.get_channel().await.map_err(|e| e.to_string())?;
98    channel
99        .request_subsystem(true, "sftp")
100        .await
101        .map_err(|e| e.to_string())?;
102    let sftp = SftpSession::new(channel.into_stream())
103        .await
104        .map_err(|e| e.to_string())?;
105
106    if let Err(err) = sftp.metadata(&remote_file.path).await {
107        return Err(format!(
108            "dir_info(): metadata error for {}: {}",
109            remote_file.path, err
110        ));
111    }
112
113    // Display name = last component of `remote_file.path`
114    let display_name = Path::new(&remote_file.path)
115        .file_name()
116        .and_then(|s| s.to_str())
117        .unwrap_or(&remote_file.path)
118        .to_string();
119    let filepath: Option<PathBuf> = Some(PathBuf::from(&remote_file.path));
120
121    Ok((resolved_uri, display_name, filepath))
122}
123
124pub async fn resolve_symlink(
125    client: &Client,
126    remotefile: &RemoteFile,
127) -> Result<RemoteFile, String> {
128    let channel = client.get_channel().await.map_err(|e| e.to_string())?;
129    channel
130        .request_subsystem(true, "sftp")
131        .await
132        .map_err(|e| e.to_string())?;
133    let sftp = SftpSession::new(channel.into_stream())
134        .await
135        .map_err(|e| e.to_string())?;
136
137    match sftp.read_link(&remotefile.path).await {
138        Ok(link_path) => {
139            // Build new URI using same host/username but new path
140            let new_uri = format!(
141                "ssh://{}{}:{}{}",
142                remotefile
143                    .username
144                    .clone()
145                    .map(|u| u + "@")
146                    .unwrap_or_default(),
147                remotefile.host,
148                remotefile.port,
149                link_path
150            );
151            new_uri.parse::<RemoteFile>().map_err(|e| e.to_string())
152        }
153        Err(_) => Ok(remotefile.clone()), // not a symlink → keep original
154    }
155}
156
157async fn remote_sftp_list(
158    client: &Client,
159    uri: &str,
160    sizes: IconSizes,
161) -> Result<Vec<tab::Item>, String> {
162    let mut remote_file = uri.parse::<RemoteFile>().map_err(|e| e.to_string())?;
163    let force_dir = uri.starts_with("ssh:///");
164    let path = remote_file.path.clone();
165    let channel = client.get_channel().await.map_err(|e| e.to_string())?;
166    channel
167        .request_subsystem(true, "sftp")
168        .await
169        .map_err(|e| e.to_string())?;
170    let sftp = SftpSession::new(channel.into_stream())
171        .await
172        .map_err(|e| e.to_string())?;
173    let entries = sftp
174        .read_dir(path.clone())
175        .await
176        .map_err(|e| format!("read_dir {path}: {e:?}"))?;
177
178    let mut items = Vec::new();
179    let mut samples: HashMap<String, SampleFiles> = HashMap::new();
180
181    // ------------------------------------------------------------
182    // First pass — collect samples & normal files
183    // ------------------------------------------------------------
184    for entry in entries {
185        let child_path = PathBuf::from(entry.file_name());
186        let file_type = entry.file_type();
187        let info = entry.metadata();
188        if info.is_symlink() {
189            remote_file = resolve_symlink(client, &remote_file).await?;
190        }
191        let name = child_path
192            .file_name()
193            .and_then(|s| s.to_str())
194            .unwrap_or("")
195            .to_string();
196        let new_path = PathBuf::from(&path).join(&child_path);
197        let mut url = Url::parse(&format!(
198            "ssh://{}{}:{}",
199            remote_file
200                .username
201                .clone()
202                .map(|u| u + "@")
203                .unwrap_or_default(),
204            remote_file.host,
205            remote_file.port,
206        ))
207        .unwrap();
208        let url_path = format!("{}/{}", path.trim_end_matches('/'), name);
209        url.set_path(&url_path);
210        let child_uri = url.to_string();
211        let location = Location::Remote(child_uri.clone(), name.clone(), Some(new_path.clone()));
212
213        // Always register .results.* files in the samples map for grouped items,
214        // but also add them individually (is_tbprofiler_groupable_raw_result_file = true) so the
215        // display layer can toggle without a rescan.
216        let mut is_tbprofiler_groupable_raw_result_file = false;
217        if file_type == FileType::File
218            && let Some((sample_id, suffix)) = name.split_once(".results.") {
219                let entry = samples.entry(sample_id.to_string()).or_insert(SampleFiles {
220                    json: None,
221                    csv: None,
222                    docx: None,
223                    mtime: info
224                        .modified()
225                        .ok()
226                        .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
227                        .map(|d| d.as_secs())
228                        .unwrap_or(0),
229                    size: None,
230                });
231
232                match suffix {
233                    "json" => entry.json = Some(new_path.clone()),
234                    "csv" => entry.csv = Some(new_path.clone()),
235                    "docx" => entry.docx = Some(new_path.clone()),
236                    _ => {}
237                }
238
239                is_tbprofiler_groupable_raw_result_file = true;
240            }
241
242        let metadata = if !force_dir {
243            let mtime = info
244                .modified()
245                .ok()
246                .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
247                .map(|d| d.as_secs())
248                .unwrap_or(0);
249            let is_dir = info.is_dir();
250            let size_opt = (!is_dir).then_some(info.size).flatten();
251            let mut children_opt = None;
252            let is_tbprofiler_json =
253                MimeGuess::from_path(&new_path).first_or_octet_stream() == mime::APPLICATION_JSON;
254            let _is_ab1 = new_path.extension().map(|e| e.eq_ignore_ascii_case("ab1")).unwrap_or(false);
255            let mut tbprofilerjson_opt = None;
256            let mut is_susceptible = None;
257            if is_tbprofiler_json {
258                match load_remote_json(client, &child_uri).await {
259                    Ok(json) => {
260                        let sus = json.dr_variants.iter().all(|v| v.is_susceptible());
261                        tbprofilerjson_opt = Some(json);
262                        is_susceptible = Some(sus);
263                    }
264                    Err(e) => {
265                        log::info!("Failed to load JSON for {}: {}", new_path.display(), e);
266                    }
267                }
268            }
269            if is_dir {
270                let mut count = 0;
271                match sftp.read_dir(new_path.to_string_lossy().to_string()).await {
272                    Ok(mut dir) => {
273                        for entry in dir.by_ref() {
274                            if entry.file_name() == "." || entry.file_name() == ".." {
275                                continue;
276                            }
277                            count += 1;
278                        }
279                    }
280                    Err(_) => {
281                        log::info!(
282                            "Could not read directory to count children: {}",
283                            new_path.display()
284                        );
285                    }
286                }
287                children_opt = Some(count);
288            }
289            let is_tbprofiler_result_as_sample = tbprofilerjson_opt.is_some();
290            ItemMetadata::RusshPath {
291                mtime,
292                size_opt,
293                children_opt,
294                is_tbprofiler_json,
295                tbprofilerjson_opt,
296                is_tbprofiler_result_as_sample,
297                is_tbprofiler_groupable_raw_result_file,
298                sample_json_path_opt: None,
299                sample_csv_path_opt: None,
300                sample_docx_path_opt: None,
301                is_susceptible,
302            }
303        } else {
304            ItemMetadata::SimpleDir { entries: 0 }
305        };
306
307        let (mime, icon_handle_grid, icon_handle_list, icon_handle_list_condensed) = {
308            let mime = if metadata.is_dir() {
309                "inode/directory".parse().unwrap()
310            } else {
311                MimeGuess::from_path(&new_path).first_or_octet_stream()
312            };
313            let mime_clone = mime.clone();
314            let file_icon = |size| {
315                widget::icon::from_name(if metadata.is_dir() {
316                    "folder"
317                } else if mime_clone.type_() == mime::IMAGE {
318                    "image-x-generic"
319                } else if mime_clone == mime::APPLICATION_PDF {
320                    "application-pdf"
321                } else {
322                    "text-x-generic"
323                })
324                .size(size)
325                .handle()
326            };
327            (
328                // TODO: get mime from content_type?
329                mime,
330                file_icon(sizes.grid()),
331                file_icon(sizes.list()),
332                file_icon(sizes.list_condensed()),
333            )
334        };
335
336        // Check if item is hidden
337        let hidden = name.starts_with('.');
338
339        items.push(tab::Item {
340            name: name.clone(),
341            is_mount_point: false,
342            is_client_point: false,
343            display_name: name.clone(),
344            metadata,
345            hidden,
346            location_opt: Some(location),
347            image_dimensions: None,
348            mime,
349            icon_handle_grid,
350            icon_handle_list,
351            icon_handle_list_condensed,
352            thumbnail_opt: Some(ItemThumbnail::NotImage),
353            button_id: widget::Id::unique(),
354            pos_opt: Cell::new(None),
355            rect_opt: Cell::new(None),
356            selected: false,
357            highlighted: false,
358            overlaps_drag_rect: false,
359            dir_size: DirSize::NotDirectory,
360            cut: false,
361        });
362    }
363
364    // Build susceptibility map from JSON items (already loaded via load_remote_json)
365    let sample_susceptibility: HashMap<String, bool> = items
366        .iter()
367        .filter_map(|item| {
368            if let ItemMetadata::RusshPath { is_tbprofiler_groupable_raw_result_file: true, is_susceptible: Some(true), .. } =
369                &item.metadata
370            {
371                let id = item.name.find('.').map(|i| item.name[..i].to_string())?;
372                Some((id, true))
373            } else {
374                None
375            }
376        })
377        .collect();
378
379    // Reset is_tbprofiler_groupable_raw_result_file for items whose sample group has no JSON,
380    // and propagate susceptibility to non-JSON raw files
381    for item in &mut items {
382        let name = item.name.clone();
383        if let ItemMetadata::RusshPath { is_tbprofiler_groupable_raw_result_file, is_susceptible, .. } =
384            &mut item.metadata
385            && *is_tbprofiler_groupable_raw_result_file {
386                let sample_id = name.find('.').map(|i| &name[..i]);
387                if sample_id.is_none_or(|id| samples.get(id).is_none_or(|f| f.json.is_none()))
388                {
389                    *is_tbprofiler_groupable_raw_result_file = false;
390                } else if let Some(id) = sample_id
391                    && let Some(&sus) = sample_susceptibility.get(id) {
392                        *is_susceptible = Some(sus);
393                    }
394            }
395    }
396
397    // ------------------------------------------------------------
398    // Second pass — build grouped sample items
399    // ------------------------------------------------------------
400    for (sample_id, files) in samples {
401        if files.json.is_none() {
402            continue;
403        }
404        let mut tbprofilerjson_opt = None;
405        let mut is_susceptible = None;
406
407        if let Some(json_path) = &files.json {
408            let mut url = Url::parse(&format!(
409                "ssh://{}{}:{}",
410                remote_file
411                    .username
412                    .clone()
413                    .map(|u| u + "@")
414                    .unwrap_or_default(),
415                remote_file.host,
416                remote_file.port,
417            ))
418            .unwrap();
419
420            let url_path = json_path.to_string_lossy().replace('\\', "/");
421            url.set_path(&url_path);
422            match load_remote_json(client, url.as_str()).await {
423                Ok(json) => {
424                    let sus = json.dr_variants.iter().all(|v| v.is_susceptible());
425                    tbprofilerjson_opt = Some(json);
426                    is_susceptible = Some(sus);
427                }
428                Err(e) => log::warn!("Failed to load sample JSON: {}", e),
429            }
430        }
431
432        let location = Location::Remote(uri.to_string(), sample_id.clone(), None);
433
434        let metadata = ItemMetadata::RusshPath {
435            mtime: files.mtime,
436            size_opt: None,
437            children_opt: None,
438            is_tbprofiler_json: true,
439            tbprofilerjson_opt,
440            is_tbprofiler_result_as_sample: true,
441            is_tbprofiler_groupable_raw_result_file: false,
442            sample_json_path_opt: files.json.clone(),
443            sample_csv_path_opt: files.csv.clone(),
444            sample_docx_path_opt: files.docx.clone(),
445            is_susceptible,
446        };
447
448        items.push(tab::Item {
449            name: sample_id.clone(),
450            display_name: sample_id.clone(),
451            metadata,
452            hidden: false,
453            location_opt: Some(location),
454            image_dimensions: None,
455            mime: "application/json".parse().unwrap(),
456            icon_handle_grid: widget::icon::from_name("text-x-generic")
457                .size(sizes.grid())
458                .handle(),
459            icon_handle_list: widget::icon::from_name("text-x-generic")
460                .size(sizes.list())
461                .handle(),
462            icon_handle_list_condensed: widget::icon::from_name("text-x-generic")
463                .size(sizes.list_condensed())
464                .handle(),
465            is_mount_point: false,
466            is_client_point: false,
467            thumbnail_opt: Some(ItemThumbnail::NotImage),
468            button_id: widget::Id::unique(),
469            pos_opt: Cell::new(None),
470            rect_opt: Cell::new(None),
471            selected: false,
472            highlighted: false,
473            overlaps_drag_rect: false,
474            dir_size: DirSize::NotDirectory,
475            cut: false,
476        });
477    }
478
479    Ok(items)
480}
481
482async fn remote_sftp_parent(
483    client: &Client,
484    uri: &str,
485    sizes: IconSizes,
486) -> Result<tab::Item, String> {
487    let remote_file = uri.parse::<RemoteFile>().map_err(|e| e.to_string())?;
488    let path = remote_file.path.clone();
489    let channel = client.get_channel().await.map_err(|e| e.to_string())?;
490    channel
491        .request_subsystem(true, "sftp")
492        .await
493        .map_err(|e| e.to_string())?;
494    let sftp = SftpSession::new(channel.into_stream())
495        .await
496        .map_err(|e| e.to_string())?;
497    let metadata = sftp
498        .metadata(path.clone())
499        .await
500        .map_err(|e| format!("metadata {path}: {e:?}"))?;
501
502    let child_path = PathBuf::from(remote_file.path.clone());
503    let name = child_path
504        .file_name()
505        .and_then(|s| s.to_str())
506        .unwrap_or("")
507        .to_string();
508    let child_uri = format!(
509        "ssh://{}{}:{}{}",
510        remote_file
511            .username
512            .clone()
513            .map(|u| u + "@")
514            .unwrap_or_default(),
515        remote_file.host,
516        remote_file.port,
517        remote_file.path.clone(),
518    );
519
520    let location = Location::Remote(
521        child_uri.clone(),
522        name.clone(),
523        Some(PathBuf::from(remote_file.path.clone())),
524    );
525
526    let item_metadata = {
527        let mtime = metadata
528            .modified()
529            .ok()
530            .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
531            .map(|d| d.as_secs())
532            .unwrap_or(0);
533        let is_dir = metadata.is_dir();
534        let size_opt = (!is_dir).then_some(metadata.size).flatten();
535        let mut children_opt = None;
536        if is_dir {
537            let mut count = 0;
538            match sftp.read_dir(remote_file.path.clone()).await {
539                Ok(mut dir) => {
540                    for entry in dir.by_ref() {
541                        if entry.file_name() == "." || entry.file_name() == ".." {
542                            continue;
543                        }
544                        count += 1;
545                    }
546                }
547                Err(_) => {
548                    log::info!(
549                        "Could not read directory to count children: {}",
550                        remote_file.path.clone()
551                    );
552                }
553            }
554            children_opt = Some(count);
555        }
556        ItemMetadata::RusshPath {
557            mtime,
558            size_opt,
559            children_opt,
560            is_tbprofiler_json: false,
561            tbprofilerjson_opt: None,
562            is_tbprofiler_result_as_sample: false,
563            is_tbprofiler_groupable_raw_result_file: false,
564            sample_json_path_opt: None,
565            sample_csv_path_opt: None,
566            sample_docx_path_opt: None,
567            is_susceptible: None,
568        }
569    };
570    let (mime, icon_handle_grid, icon_handle_list, icon_handle_list_condensed) = {
571        let file_icon = |size| {
572            widget::icon::from_name(if metadata.is_dir() {
573                "folder"
574            } else {
575                "text-x-generic"
576            })
577            .size(size)
578            .handle()
579        };
580        (
581            "inode/directory".parse().unwrap(),
582            file_icon(sizes.grid()),
583            file_icon(sizes.list()),
584            file_icon(sizes.list_condensed()),
585        )
586    };
587    let hidden = name.starts_with('.');
588    let item = tab::Item {
589        name: name.clone(),
590        is_mount_point: false,
591        is_client_point: false,
592        display_name: name,
593        metadata: item_metadata,
594        hidden,
595        location_opt: Some(location),
596        image_dimensions: None,
597        mime,
598        icon_handle_grid,
599        icon_handle_list,
600        icon_handle_list_condensed,
601        thumbnail_opt: Some(ItemThumbnail::NotImage),
602        button_id: widget::Id::unique(),
603        pos_opt: Cell::new(None),
604        rect_opt: Cell::new(None),
605        selected: false,
606        highlighted: false,
607        overlaps_drag_rect: false,
608        dir_size: DirSize::NotDirectory,
609        cut: false,
610    };
611    Ok(item)
612}
613
614async fn load_remote_json(client: &Client, uri: &str) -> Result<TbProfilerJson, String> {
615    let remote_file = uri.parse::<RemoteFile>().map_err(|e| e.to_string())?;
616    let channel = client.get_channel().await.map_err(|e| e.to_string())?;
617    channel
618        .request_subsystem(true, "sftp")
619        .await
620        .map_err(|e| e.to_string())?;
621    let sftp = SftpSession::new(channel.into_stream())
622        .await
623        .map_err(|e| e.to_string())?;
624    let mut file = sftp
625        .open(remote_file.path.clone())
626        .await
627        .map_err(|e| e.to_string())?;
628    let mut contents = String::new();
629    file.read_to_string(&mut contents)
630        .await
631        .map_err(|e| e.to_string())?;
632
633    let parsed: TbProfilerJson = serde_json::from_str(&contents).map_err(|e| e.to_string())?;
634    Ok(parsed)
635}
636
637async fn perform_download(
638    client: &Client,
639    paths: Box<[PathBuf]>,
640    to: PathBuf,
641    zip_output: Option<PathBuf>,
642    progress_tx: &tokio::sync::mpsc::UnboundedSender<()>,
643) -> Result<(), anyhow::Error> {
644    let mut reserved = HashSet::new();
645
646    // Shell-safe single-quote escaping: replace ' with '\''
647    let sq = |s: &str| s.replace('\'', "'\\''");
648
649    // Open one SFTP session to check whether each path is a directory.
650    let channel = client.get_channel().await?;
651    channel.request_subsystem(true, "sftp").await?;
652    let sftp = SftpSession::new(channel.into_stream()).await?;
653
654    // Partition into directories and plain files.
655    let mut dirs: Vec<&PathBuf> = Vec::new();
656    let mut files: Vec<&PathBuf> = Vec::new();
657    for from in paths.iter() {
658        if from.file_name().is_none() {
659            continue;
660        }
661        let remote = from.to_string_lossy().replace('\\', "/");
662        let is_dir = sftp
663            .metadata(&remote)
664            .await
665            .map(|m| m.is_dir())
666            .unwrap_or(false);
667        if is_dir {
668            dirs.push(from);
669        } else {
670            files.push(from);
671        }
672    }
673
674    // --- Multiple (or single) directories → one combined zip ---
675    if !dirs.is_empty() {
676        let remote_zip = format!(
677            "/tmp/cosmic_files_{}.zip",
678            std::time::SystemTime::now()
679                .duration_since(std::time::UNIX_EPOCH)
680                .unwrap_or_default()
681                .as_millis()
682        );
683
684        // Each dir may live in a different parent, so cd into the right parent
685        // per entry and append to the same archive with `zip -r`.
686        let parts: Vec<String> = dirs
687            .iter()
688            .map(|dir| {
689                let parent = dir
690                    .parent()
691                    .unwrap_or_else(|| Path::new("/"))
692                    .to_string_lossy()
693                    .replace('\\', "/");
694                let dir_name = dir.file_name().unwrap().to_string_lossy();
695                format!(
696                    "(cd '{}' && zip -r '{}' '{}')",
697                    sq(&parent),
698                    sq(&remote_zip),
699                    sq(&dir_name),
700                )
701            })
702            .collect();
703        let zip_cmd = parts.join(" && ");
704
705        let res = client.execute(&zip_cmd).await?;
706        if res.exit_status != 0 {
707            let _ = client
708                .execute(&format!("rm -f '{}'", sq(&remote_zip)))
709                .await;
710            return Err(anyhow::anyhow!(
711                "zip failed: stdout: {} stderr: {}",
712                res.stdout,
713                res.stderr
714            ));
715        }
716
717        // Use the user-chosen path when provided (SaveFile dialog), otherwise derive a name.
718        let local_target = if let Some(ref out) = zip_output {
719            out.clone()
720        } else {
721            let zip_name = if dirs.len() == 1 {
722                format!("{}.zip", dirs[0].file_name().unwrap().to_string_lossy())
723            } else {
724                "folders.zip".to_string()
725            };
726            let zip_stem_path = PathBuf::from(&zip_name);
727            let mut t = to.join(&zip_name);
728            if t.try_exists().unwrap_or(false) || reserved.contains(&t) {
729                t = download_unique_path(&zip_stem_path, &to, &mut reserved);
730            }
731            t
732        };
733        reserved.insert(local_target.clone());
734
735        let dl_result = client
736            .download_file(remote_zip.clone(), local_target.clone())
737            .await;
738        let _ = client
739            .execute(&format!("rm -f '{}'", sq(&remote_zip)))
740            .await;
741        if let Err(err) = dl_result {
742            return Err(anyhow::anyhow!("Download failed for zip: {}", err));
743        }
744        // Each directory in dirs counts as one completed item
745        for _ in dirs.iter() {
746            let _ = progress_tx.send(());
747        }
748    }
749
750    // --- Plain files downloaded individually ---
751    for from in files {
752        let name = from.file_name().unwrap();
753        let remote = from.to_string_lossy().replace('\\', "/");
754        let mut target = to.join(name);
755        if target.try_exists().unwrap_or(false) || reserved.contains(&target) {
756            target = download_unique_path(from, &to, &mut reserved);
757        }
758        reserved.insert(target.clone());
759
760        if let Err(err) = client.download_file(remote.clone(), target.clone()).await {
761            return Err(anyhow::anyhow!("Download failed for {}: {}", remote, err));
762        }
763        let _ = progress_tx.send(());
764    }
765
766    Ok(())
767}
768
769pub fn download_unique_path(from: &Path, to: &Path, reserved: &mut HashSet<PathBuf>) -> PathBuf {
770    // List of compound extensions to check
771    const COMPOUND_EXTENSIONS: &[&str] = &[
772        ".tar.gz",
773        ".tar.bz2",
774        ".tar.xz",
775        ".tar.zst",
776        ".tar.lz",
777        ".tar.lzma",
778        ".tar.sz",
779        ".tar.lzo",
780        ".tar.br",
781        ".tar.Z",
782        ".tar.pz",
783    ];
784
785    let to = to.to_owned();
786    let file_name = from.file_name().and_then(|n| n.to_str()).unwrap();
787    let file_name = file_name.to_string();
788
789    // --- split into stem/ext correctly ---
790    let (stem, ext) = COMPOUND_EXTENSIONS
791        .iter()
792        .copied()
793        .find(|&e| file_name.ends_with(e))
794        .map(|e| {
795            (
796                file_name.strip_suffix(e).unwrap().to_string(),
797                Some(e[1..].to_string()),
798            )
799        })
800        .unwrap_or_else(|| {
801            from.file_stem()
802                .and_then(|s| s.to_str())
803                .map_or((file_name.clone(), None), |s| {
804                    (
805                        s.to_string(),
806                        from.extension()
807                            .and_then(|e| e.to_str())
808                            .map(str::to_string),
809                    )
810                })
811        });
812
813    // --- find free name ---
814    for n in 0.. {
815        let new_name = if n == 0 {
816            file_name.clone()
817        } else {
818            match &ext {
819                Some(ext) => format!("{} ({} {}).{}", stem, fl!("copy_noun"), n, ext),
820                None => format!("{} ({} {})", stem, fl!("copy_noun"), n),
821            }
822        };
823
824        let candidate = to.join(new_name);
825
826        // IMPORTANT: check both filesystem AND reserved names
827        if !candidate.exists() && !reserved.contains(&candidate) {
828            reserved.insert(candidate.clone());
829            return candidate;
830        }
831    }
832
833    unreachable!()
834}
835
836pub async fn run_tbprofiler(
837    client: &Client,
838    paths: Box<[PathBuf]>,
839    tb_config: TBConfig,
840) -> Result<SlurmJobId, anyhow::Error> {
841    if tb_config.script_path.is_empty()
842        || tb_config.out_dir.is_empty()
843        || tb_config.docx_template_path.is_empty()
844    {
845        return Err(anyhow::anyhow!(
846            "Please configure paths under TB profiler settings..."
847        ));
848    }
849    let mut sample_map: BTreeMap<String, BTreeSet<u8>> = BTreeMap::new();
850    for path in paths.iter() {
851        let filename = path
852            .file_name()
853            .and_then(|n| n.to_str())
854            .ok_or_else(|| anyhow::anyhow!("Invalid filename in path: {:?}", path))?;
855        if let Some(sample) = filename.strip_suffix(tb_config.pair1_suffix.as_str()) {
856            sample_map.entry(sample.to_string()).or_default().insert(1);
857        } else if let Some(sample) = filename.strip_suffix(tb_config.pair2_suffix.as_str()) {
858            sample_map.entry(sample.to_string()).or_default().insert(2);
859        } else {
860            return Err(anyhow::anyhow!(
861                "Unexpected FASTQ filename format: {}",
862                filename
863            ));
864        }
865    }
866    if sample_map.is_empty() {
867        return Err(anyhow::anyhow!("No valid FASTQ files provided"));
868    }
869    for (sample, reads) in &sample_map {
870        if reads.len() != 2 {
871            return Err(anyhow::anyhow!(
872                "Sample {} does not have both {} and {} FASTQ files",
873                sample,
874                tb_config.pair1_suffix,
875                tb_config.pair2_suffix
876            ));
877        }
878    }
879    let sample_ids: Vec<String> = sample_map.keys().cloned().collect();
880    let sample_ids_string = sample_ids.join(",");
881    let array_end = sample_ids
882        .len()
883        .checked_sub(1)
884        .ok_or_else(|| anyhow::anyhow!("Need at least 1 sample"))?;
885
886    let raw_sequence_dir_paired = {
887        let first_parent = paths
888            .first()
889            .and_then(|p| p.parent())
890            .ok_or_else(|| anyhow::anyhow!("Could not determine parent directory"))?;
891
892        for p in paths.iter() {
893            let parent = p
894                .parent()
895                .ok_or_else(|| anyhow::anyhow!("Path has no parent: {:?}", p))?;
896            if parent != first_parent {
897                return Err(anyhow::anyhow!(
898                    "FASTQ files are in different directories: {:?} vs {:?}",
899                    first_parent,
900                    parent
901                ));
902            }
903        }
904        first_parent.to_string_lossy().into_owned()
905    };
906
907    let command_run_tbprofiler = format!(
908        "sbatch --parsable --array 0-{} {} \"{}\" {} {} {}",
909        array_end,
910        tb_config.script_path,
911        sample_ids_string,
912        raw_sequence_dir_paired,
913        tb_config.out_dir,
914        tb_config.docx_template_path,
915    );
916    let res = client.execute(&command_run_tbprofiler).await?;
917    if res.exit_status != 0 {
918        return Err(anyhow::anyhow!(
919            "tbprofiler failed (exit {}):\nstdout:\n{}\nstderr:\n{}",
920            res.exit_status,
921            res.stdout,
922            res.stderr
923        ));
924    }
925    if !res.stderr.is_empty() {
926        log::warn!("tbprofiler stderr: {}", res.stderr);
927    }
928    let job_id: usize = res.stdout.split(';').next().unwrap().trim().parse()?;
929    let tasks = array_end
930        .checked_add(1)
931        .ok_or_else(|| anyhow::anyhow!("Overflow when calculating tasks"))?;
932    let job_id = SlurmJobId {
933        array_id: job_id,
934        tasks,
935        running_tasks: tasks,
936    };
937    Ok(job_id)
938}
939
940async fn poll_running_tasks(
941    client: &Client,
942    array_id: usize,
943) -> Result<usize, anyhow::Error> {
944    let cmd = format!(
945        "squeue -j {} -r -h -o \"%T\" 2>/dev/null | grep -c RUNNING || true",
946        array_id
947    );
948    let res = client.execute(&cmd).await?;
949    let count: usize = res.stdout.trim().parse().unwrap_or(0);
950    Ok(count)
951}
952
953pub async fn delete_remote_files(
954    client: &Client,
955    paths: &[PathBuf],
956) -> Result<String, anyhow::Error> {
957    if paths.is_empty() {
958        return Ok(String::new());
959    }
960    let filenames: Vec<String> = paths
961        .iter()
962        .map(|p| {
963            p.file_name()
964                .and_then(|n| n.to_str())
965                .unwrap_or("<invalid>")
966                .to_string()
967        })
968        .collect();
969    let files = paths
970        .iter()
971        .map(|p| {
972            let remote = p.to_string_lossy().replace('\\', "/");
973            format!("'{}'", remote.replace('\'', "'\\''"))
974        })
975        .collect::<Vec<_>>()
976        .join(" ");
977    let command = format!("rm -rf {}", files);
978    let res = client.execute(&command).await?;
979    if res.exit_status != 0 {
980        return Err(anyhow::anyhow!(
981            "Failed to delete files (exit {}):\nstderr:\n{}",
982            res.exit_status,
983            res.stderr
984        ));
985    }
986    Ok(filenames.join("\n"))
987}
988
989pub async fn delete_tbprofiler_results(
990    client: &Client,
991    tb_config: TBConfig,
992) -> Result<String, anyhow::Error> {
993    let command = format!("rm -rf {}/results/*", tb_config.out_dir);
994    let res = client.execute(&command).await?;
995    if res.exit_status != 0 {
996        return Err(anyhow::anyhow!(
997            "Failed to delete remote file (exit {}):\nstderr:\n{}",
998            res.exit_status,
999            res.stderr
1000        ));
1001    }
1002    Ok(res.stdout)
1003}
1004
1005#[allow(dead_code)]
1006fn request_password(uri: String, event_tx: mpsc::UnboundedSender<Event>) -> ClientAuth {
1007    let auth = ClientAuth {
1008        message: String::new(),
1009        username_opt: Some(String::new()),
1010        domain_opt: Some(String::new()),
1011        password_opt: Some(String::new()),
1012        remember_opt: Some(false),
1013        anonymous_opt: Some(false),
1014    };
1015    let (auth_tx, mut auth_rx) = mpsc::channel(1);
1016    event_tx
1017        .send(Event::RemoteAuth(uri.clone(), auth, auth_tx))
1018        .unwrap();
1019
1020    if let Some(auth) = auth_rx.blocking_recv() {
1021        auth
1022    } else {
1023        ClientAuth {
1024            message: "Authentication cancelled".into(),
1025            username_opt: None,
1026            domain_opt: None,
1027            password_opt: None,
1028            remember_opt: None,
1029            anonymous_opt: None,
1030        }
1031    }
1032}
1033
1034#[derive(Debug, Clone)]
1035pub struct RemoteFile {
1036    pub host: String,
1037    pub port: u16,
1038    pub username: Option<String>,
1039    pub path: String,
1040}
1041
1042impl RemoteFile {
1043    pub fn uri(&self) -> String {
1044        let userpart = if let Some(user) = &self.username {
1045            format!("{}@", user)
1046        } else {
1047            "".into()
1048        };
1049        format!("ssh://{}{}:{}{}", userpart, self.host, self.port, self.path)
1050    }
1051}
1052
1053#[derive(Debug)]
1054pub enum RemoteFileError {
1055    InvalidUrl(String),
1056    UnsupportedScheme(String),
1057    MissingHost,
1058}
1059
1060impl fmt::Display for RemoteFileError {
1061    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1062        match self {
1063            RemoteFileError::InvalidUrl(e) => write!(f, "Invalid URL: {e}"),
1064            RemoteFileError::UnsupportedScheme(s) => {
1065                write!(f, "Unsupported scheme: {s}")
1066            }
1067            RemoteFileError::MissingHost => write!(f, "Missing host"),
1068        }
1069    }
1070}
1071
1072impl FromStr for RemoteFile {
1073    type Err = RemoteFileError;
1074
1075    fn from_str(uri: &str) -> Result<Self, Self::Err> {
1076        let url = Url::parse(uri).map_err(|e| RemoteFileError::InvalidUrl(e.to_string()))?;
1077        if url.scheme() != "ssh" {
1078            return Err(RemoteFileError::UnsupportedScheme(url.scheme().to_string()));
1079        }
1080        let host = url
1081            .host_str()
1082            .ok_or(RemoteFileError::MissingHost)?
1083            .to_string();
1084        let port = url.port().unwrap_or(22);
1085        let username = if url.username().is_empty() {
1086            None
1087        } else {
1088            Some(url.username().to_string())
1089        };
1090        let mut path = url.path().to_string();
1091        if path.is_empty() {
1092            path = "/".into();
1093        }
1094        if path.len() > 1 && path.ends_with('/') {
1095            path.pop();
1096        }
1097        Ok(Self {
1098            host,
1099            port,
1100            username,
1101            path,
1102        })
1103    }
1104}
1105
1106enum Cmd {
1107    Items(IconSizes, mpsc::Sender<ClientItems>),
1108    Rescan,
1109    Connect(ClientItem, tokio::sync::oneshot::Sender<anyhow::Result<()>>),
1110    RemoteDrive(String, tokio::sync::oneshot::Sender<anyhow::Result<()>>),
1111    RemoteScan(
1112        String,
1113        IconSizes,
1114        mpsc::Sender<Result<Vec<tab::Item>, String>>,
1115    ),
1116    RemoteParent(String, IconSizes, mpsc::Sender<Result<tab::Item, String>>),
1117    #[allow(clippy::type_complexity)]
1118    DirInfo(
1119        String,
1120        mpsc::Sender<Result<(String, String, Option<PathBuf>), anyhow::Error>>,
1121    ),
1122    Disconnect(ClientItem),
1123    Download(
1124        Box<[PathBuf]>,
1125        Vec<String>,
1126        PathBuf,
1127        Option<PathBuf>,
1128        tokio::sync::oneshot::Sender<anyhow::Result<()>>,
1129        tokio::sync::mpsc::UnboundedSender<()>,
1130    ),
1131    RunTbProfiler(
1132        Box<[PathBuf]>,
1133        Vec<String>,
1134        TBConfig,
1135        tokio::sync::oneshot::Sender<anyhow::Result<SlurmJobId>>,
1136    ),
1137    DeleteRemoteFiles(
1138        Box<[PathBuf]>,
1139        Vec<String>,
1140        tokio::sync::oneshot::Sender<anyhow::Result<String>>,
1141    ),
1142    DeleteTbProfilerResults(
1143        String,
1144        TBConfig,
1145        tokio::sync::oneshot::Sender<anyhow::Result<String>>,
1146    ),
1147    PollJobStatus(usize, String),
1148}
1149
1150enum Event {
1151    Changed,
1152    Items(ClientItems),
1153    ClientResult(ClientItem, Result<bool, String>),
1154    #[allow(dead_code)]
1155    RemoteAuth(String, ClientAuth, mpsc::Sender<ClientAuth>),
1156    RemoteResult(String, Result<bool, String>),
1157    RunTbProfilerResult(String, Result<SlurmJobId, String>),
1158    DeleteRemoteFilesResult(String, Result<String, String>),
1159    JobStatusUpdate(String, usize, usize),
1160}
1161
1162#[derive(Clone, Debug)]
1163pub struct Item {
1164    name: String,
1165    is_connected: bool,
1166    icon_opt: Option<PathBuf>,
1167    icon_symbolic_opt: Option<PathBuf>,
1168    path_opt: Option<PathBuf>,
1169    uri: String,
1170    host: String,
1171    port: u16,
1172    username: String,
1173    auth: AuthMethod,
1174    server_check: ServerCheckMethod,
1175}
1176
1177impl Item {
1178    pub fn name(&self) -> String {
1179        self.name.clone()
1180    }
1181
1182    pub const fn is_connected(&self) -> bool {
1183        self.is_connected
1184    }
1185
1186    pub fn uri(&self) -> String {
1187        self.uri.clone()
1188    }
1189
1190    pub fn host(&self) -> String {
1191        self.host.clone()
1192    }
1193
1194    pub fn icon(&self, symbolic: bool) -> Option<widget::icon::Handle> {
1195        if symbolic {
1196            self.icon_symbolic_opt.as_ref()
1197        } else {
1198            self.icon_opt.as_ref()
1199        }
1200        .map(|icon| widget::icon::from_path(icon.clone()))
1201    }
1202
1203    pub fn path(&self) -> Option<PathBuf> {
1204        self.path_opt.clone()
1205    }
1206}
1207
1208pub struct Russh {
1209    command_tx: mpsc::UnboundedSender<Cmd>,
1210    event_rx: Arc<Mutex<mpsc::UnboundedReceiver<Event>>>,
1211}
1212
1213impl Russh {
1214    pub fn new() -> Self {
1215        let (command_tx, mut command_rx) = mpsc::unbounded_channel();
1216        let (event_tx, event_rx) = mpsc::unbounded_channel();
1217        std::thread::spawn(move || {
1218            let rt = Builder::new_current_thread().enable_all().build().unwrap();
1219            rt.block_on(async move {
1220                let clients = Arc::new(RwLock::new(HashMap::<String, Arc<Client>>::new()));
1221                let mut client_items = Vec::<ClientItem>::new();
1222                while let Some(command) = command_rx.recv().await {
1223                    match command {
1224                        Cmd::Items(_sizes, items_tx) => {
1225                            items_tx.send(client_items.clone()).await.unwrap();
1226                        }
1227                        Cmd::Rescan => {
1228                            event_tx.send(Event::Items(client_items.clone())).unwrap();
1229                        }
1230                        Cmd::Connect(client_item, result_tx) => {
1231                            let client_item_clone = client_item.clone();
1232                            let ClientItem::Russh(item) = client_item else {
1233                                _ = result_tx.send(Err(anyhow::anyhow!("No client item")));
1234                                continue;
1235                            };
1236                            let event_tx = event_tx.clone();
1237
1238                            let res = Client::connect(
1239                                (item.host.as_str(), item.port),
1240                                item.username.as_str(),
1241                                item.auth.clone(),
1242                                item.server_check.clone(),
1243                            )
1244                            .await;
1245                            match res {
1246                                Ok(client) => {
1247                                    {
1248                                        let mut write = clients.write().await;
1249                                        write.insert(item.host.clone(), Arc::new(client));
1250                                    }
1251                                    client_items.push(ClientItem::Russh(Item {
1252                                        name: item.name.clone(),
1253                                        is_connected: true,
1254                                        icon_opt: item.icon_opt.clone(),
1255                                        icon_symbolic_opt: item.icon_symbolic_opt.clone(),
1256                                        path_opt: item.path_opt.clone(),
1257                                        uri: item.uri.clone(),
1258                                        host: item.host.clone(),
1259                                        port: item.port,
1260                                        username: item.username.clone(),
1261                                        auth: item.auth.clone(),
1262                                        server_check: item.server_check.clone(),
1263                                    }));
1264                                    _ = result_tx.send(Ok(()));
1265                                    event_tx
1266                                        .send(Event::ClientResult(
1267                                            client_item_clone.clone(),
1268                                            Ok(true),
1269                                        ))
1270                                        .unwrap();
1271                                }
1272                                Err(err) => {
1273                                    _ = result_tx.send(Err(anyhow::anyhow!("{err:?}")));
1274                                    event_tx
1275                                        .send(Event::ClientResult(
1276                                            client_item_clone,
1277                                            Err(format!("{err}")),
1278                                        ))
1279                                        .unwrap();
1280                                }
1281                            }
1282                        }
1283                        Cmd::RemoteDrive(uri, result_tx) => {
1284                            let mut result_tx_opt = Some(result_tx);
1285                            let event_tx = event_tx.clone();
1286                            let remote_file =
1287                                match uri.parse::<RemoteFile>().map_err(|e| e.to_string()) {
1288                                    Ok(rf) => rf,
1289                                    Err(err) => {
1290                                        if let Some(result_tx) = result_tx_opt.take() {
1291                                            _ = result_tx.send(Err(anyhow::anyhow!("{err:?}")));
1292                                        }
1293                                        let _ = event_tx
1294                                            .send(Event::RemoteResult(uri, Err(err.to_string())));
1295                                        continue;
1296                                    }
1297                                };
1298                            let norm_uri = remote_file.uri();
1299                            let host = remote_file.host.as_str();
1300                            let port = remote_file.port;
1301                            let username = match remote_file.username {
1302                                Some(u) => u,
1303                                None => {
1304                                    let msg = "No username specified in URI";
1305                                    if let Some(result_tx) = result_tx_opt.take() {
1306                                        let _ = result_tx.send(Err(anyhow::anyhow!(msg)));
1307                                    }
1308                                    let _ = event_tx.send(Event::RemoteResult(
1309                                        norm_uri.clone(),
1310                                        Err(msg.to_string()),
1311                                    ));
1312                                    continue;
1313                                }
1314                            };
1315                            let auth = match get_key_files() {
1316                                Ok((key_path, _)) => AuthMethod::with_key_file(key_path, None),
1317                                Err(err) => {
1318                                    if let Some(result_tx) = result_tx_opt.take() {
1319                                        let _ =
1320                                            result_tx.send(Err(anyhow::anyhow!(err.to_string())));
1321                                    }
1322                                    let _ = event_tx.send(Event::RemoteResult(
1323                                        norm_uri.clone(),
1324                                        Err(err.to_string()),
1325                                    ));
1326                                    continue;
1327                                }
1328                            };
1329                            let existing_client = {
1330                                let read = clients.read().await;
1331                                read.contains_key(host)
1332                            };
1333                            let client_item = ClientItem::Russh(Item {
1334                                name: host.to_string(),
1335                                is_connected: true,
1336                                icon_opt: None,
1337                                icon_symbolic_opt: None,
1338                                path_opt: Some(PathBuf::from(remote_file.path.clone())),
1339                                uri: norm_uri.clone(),
1340                                host: host.to_string(),
1341                                port,
1342                                username: username.clone(),
1343                                auth: auth.clone(),
1344                                server_check: ServerCheckMethod::NoCheck,
1345                            });
1346                            if existing_client {
1347                                let has_item = client_items.iter().any(|item| {
1348                                    matches!(
1349                                        item,
1350                                        ClientItem::Russh(r) if r.host == host
1351                                    )
1352                                });
1353                                if !has_item {
1354                                    client_items.push(client_item);
1355                                    let _ = event_tx.send(Event::Changed);
1356                                }
1357                                if let Some(result_tx) = result_tx_opt.take() {
1358                                    let _ = result_tx.send(Ok(()));
1359                                }
1360                                let _ =
1361                                    event_tx.send(Event::RemoteResult(norm_uri.clone(), Ok(true)));
1362                                continue;
1363                            }
1364                            match Client::connect(
1365                                (host, port),
1366                                username.as_str(),
1367                                auth,
1368                                ServerCheckMethod::NoCheck,
1369                            )
1370                            .await
1371                            {
1372                                Ok(client) => {
1373                                    {
1374                                        let mut write = clients.write().await;
1375                                        write.insert(host.to_string(), Arc::new(client));
1376                                    }
1377                                    client_items.push(client_item);
1378                                    let _ = event_tx.send(Event::Changed);
1379                                    if let Some(result_tx) = result_tx_opt.take() {
1380                                        let _ = result_tx.send(Ok(()));
1381                                    }
1382                                    let _ = event_tx
1383                                        .send(Event::RemoteResult(norm_uri.clone(), Ok(true)));
1384                                }
1385                                Err(err) => {
1386                                    let msg = format!("Connecting fresh session failed: {}", err);
1387                                    if let Some(result_tx) = result_tx_opt.take() {
1388                                        let _ = result_tx.send(Err(anyhow::anyhow!(msg.clone())));
1389                                    }
1390                                    let _ = event_tx.send(Event::RemoteResult(
1391                                        norm_uri.clone(),
1392                                        Err(msg.clone()),
1393                                    ));
1394                                }
1395                            }
1396                        }
1397                        Cmd::RemoteScan(uri, sizes, items_tx) => {
1398                            let remote_file =
1399                                match uri.parse::<RemoteFile>().map_err(|e| e.to_string()) {
1400                                    Ok(rf) => rf,
1401                                    Err(e) => {
1402                                        let _ = items_tx.send(Err(e)).await;
1403                                        continue;
1404                                    }
1405                                };
1406                            let norm_uri = remote_file.uri();
1407                            let host = remote_file.host.as_str();
1408                            let port = remote_file.port;
1409                            let username = match remote_file.username {
1410                                Some(u) => u,
1411                                None => {
1412                                    event_tx
1413                                        .send(Event::RemoteResult(
1414                                            norm_uri,
1415                                            Err("No username specified in URI".into()),
1416                                        ))
1417                                        .unwrap();
1418                                    continue;
1419                                }
1420                            };
1421
1422                            let existing_client = {
1423                                let read = clients.read().await;
1424                                read.get(host).cloned()
1425                            };
1426                            if let Some(client) = existing_client {
1427                                let result =
1428                                    remote_sftp_list(&client, &norm_uri, sizes)
1429                                        .await;
1430                                let _ = items_tx.send(result).await;
1431                            } else {
1432                                let key_path = match get_key_files() {
1433                                    Ok(key_pair) => key_pair.0,
1434                                    Err(e) => {
1435                                        event_tx
1436                                            .send(Event::RemoteResult(norm_uri, Err(e)))
1437                                            .unwrap();
1438                                        continue;
1439                                    }
1440                                };
1441                                let auth = AuthMethod::with_key_file(key_path, None);
1442                                match Client::connect(
1443                                    (host, port),
1444                                    username.as_str(),
1445                                    auth.clone(),
1446                                    ServerCheckMethod::NoCheck,
1447                                )
1448                                .await
1449                                {
1450                                    Ok(client) => {
1451                                        {
1452                                            let mut write = clients.write().await;
1453                                            write.insert(host.to_string(), Arc::new(client));
1454                                        }
1455                                        let client = {
1456                                            let read = clients.read().await;
1457                                            Arc::clone(read.get(host).unwrap())
1458                                        };
1459                                        let result = remote_sftp_list(
1460                                            &client,
1461                                            &norm_uri,
1462                                            sizes,
1463                                        )
1464                                        .await;
1465                                        let _ = items_tx.send(result).await;
1466                                        event_tx
1467                                            .send(Event::RemoteResult(norm_uri, Ok(true)))
1468                                            .unwrap();
1469                                    }
1470                                    Err(err) => {
1471                                        let msg =
1472                                            format!("Connecting fresh session failed: {}", err);
1473                                        event_tx
1474                                            .send(Event::RemoteResult(norm_uri, Err(msg)))
1475                                            .unwrap();
1476                                    }
1477                                }
1478                            }
1479                        }
1480                        Cmd::RemoteParent(uri, sizes, items_tx) => {
1481                            let remote_file =
1482                                match uri.parse::<RemoteFile>().map_err(|e| e.to_string()) {
1483                                    Ok(rf) => rf,
1484                                    Err(e) => {
1485                                        let _ = items_tx.send(Err(e)).await;
1486                                        continue;
1487                                    }
1488                                };
1489                            let norm_uri = remote_file.uri();
1490                            let host = remote_file.host.as_str();
1491                            let port = remote_file.port;
1492                            let username = match remote_file.username {
1493                                Some(u) => u,
1494                                None => {
1495                                    event_tx
1496                                        .send(Event::RemoteResult(
1497                                            norm_uri,
1498                                            Err("No username specified in URI".into()),
1499                                        ))
1500                                        .unwrap();
1501                                    continue;
1502                                }
1503                            };
1504                            let existing_client = {
1505                                let read = clients.read().await;
1506                                read.get(host).cloned()
1507                            };
1508                            if let Some(client) = existing_client {
1509                                let result = remote_sftp_parent(&client, &norm_uri, sizes).await;
1510                                let _ = items_tx.send(result).await;
1511                            } else {
1512                                let key_path = match get_key_files() {
1513                                    Ok(key_pair) => key_pair.0,
1514                                    Err(e) => {
1515                                        event_tx
1516                                            .send(Event::RemoteResult(norm_uri, Err(e)))
1517                                            .unwrap();
1518                                        continue;
1519                                    }
1520                                };
1521                                let auth = AuthMethod::with_key_file(key_path, None);
1522                                match Client::connect(
1523                                    (host, port),
1524                                    username.as_str(),
1525                                    auth.clone(),
1526                                    ServerCheckMethod::NoCheck,
1527                                )
1528                                .await
1529                                {
1530                                    Ok(client) => {
1531                                        {
1532                                            let mut write = clients.write().await;
1533                                            write.insert(host.to_string(), Arc::new(client));
1534                                        }
1535                                        let client = {
1536                                            let read = clients.read().await;
1537                                            Arc::clone(read.get(host).unwrap())
1538                                        };
1539                                        let result =
1540                                            remote_sftp_parent(&client, &norm_uri, sizes).await;
1541                                        let _ = items_tx.send(result).await;
1542                                        event_tx
1543                                            .send(Event::RemoteResult(norm_uri, Ok(true)))
1544                                            .unwrap();
1545                                    }
1546                                    Err(err) => {
1547                                        let msg =
1548                                            format!("Connecting fresh session failed: {}", err);
1549                                        event_tx
1550                                            .send(Event::RemoteResult(norm_uri, Err(msg)))
1551                                            .unwrap();
1552                                    }
1553                                }
1554                            }
1555                        }
1556                        Cmd::DirInfo(uri, result_tx) => {
1557                            let remote_file =
1558                                match uri.parse::<RemoteFile>().map_err(|e| e.to_string()) {
1559                                    Ok(rf) => rf,
1560                                    Err(e) => {
1561                                        let _ = result_tx.send(Err(anyhow::anyhow!(e))).await;
1562                                        continue;
1563                                    }
1564                                };
1565                            let host = remote_file.host.as_str();
1566                            let client = {
1567                                let read = clients.read().await;
1568                                match read.get(host) {
1569                                    Some(c) => Arc::clone(c),
1570                                    None => {
1571                                        let msg =
1572                                            format!("No SSH client connected for host: {}", host);
1573                                        let _ = result_tx.send(Err(anyhow::anyhow!(msg))).await;
1574                                        continue;
1575                                    }
1576                                }
1577                            };
1578                            let result = dir_info(&client, &uri)
1579                                .await
1580                                .map_err(|e| anyhow::anyhow!(e));
1581                            result_tx.send(result).await.unwrap();
1582                        }
1583                        Cmd::Disconnect(client_item) => {
1584                            let ClientItem::Russh(item) = client_item else {
1585                                continue;
1586                            };
1587                            {
1588                                let mut write = clients.write().await;
1589                                write.remove(&item.host);
1590                            }
1591                            client_items.retain(|ci| {
1592                                if let ClientItem::Russh(r) = ci {
1593                                    r.host != item.host
1594                                } else {
1595                                    true
1596                                }
1597                            });
1598                            let event_tx = event_tx.clone();
1599                            let _ = event_tx.send(Event::Changed);
1600                        }
1601                        Cmd::Download(paths, uris, path, zip_output, result_tx, progress_tx) => {
1602                            let result: Result<(), anyhow::Error> = async {
1603                                let remote_files: Vec<_> = uris
1604                                    .iter()
1605                                    .map(|u| {
1606                                        u.parse::<RemoteFile>().map_err(|e| anyhow::anyhow!(e))
1607                                    })
1608                                    .collect::<Result<_, anyhow::Error>>()?;
1609                                let host = remote_files
1610                                    .first()
1611                                    .ok_or_else(|| anyhow::anyhow!("No URIs provided"))?
1612                                    .host
1613                                    .clone();
1614                                if remote_files.iter().any(|rf| rf.host != host) {
1615                                    return Err(anyhow::anyhow!(
1616                                        "All download URIs must be from the same host"
1617                                    ));
1618                                }
1619                                let client = {
1620                                    let read = clients.read().await;
1621                                    read.get(&host).cloned()
1622                                }
1623                                .ok_or_else(|| anyhow::anyhow!("No client for host {host}"))?;
1624                                perform_download(&client, paths.clone(), path.clone(), zip_output.clone(), &progress_tx).await?;
1625                                Ok(())
1626                            }
1627                            .await;
1628                            let _ = result_tx.send(result);
1629                        }
1630                        Cmd::RunTbProfiler(paths, uris, tb_config, result_tx) => {
1631                            let uri = uris
1632                                .first()
1633                                .cloned()
1634                                .unwrap_or_else(|| "ssh:///".to_string());
1635                            if uris.is_empty() {
1636                                let err = anyhow::anyhow!("No URIs provided");
1637                                let _ = result_tx.send(Err(err));
1638                                continue;
1639                            }
1640                            let remote_files: Vec<_> = match uris
1641                                .iter()
1642                                .map(|u| u.parse::<RemoteFile>().map_err(|e| anyhow::anyhow!(e)))
1643                                .collect::<Result<_, _>>()
1644                            {
1645                                Ok(v) => v,
1646                                Err(err) => {
1647                                    let _ = result_tx.send(Err(err));
1648                                    continue;
1649                                }
1650                            };
1651                            let host = match remote_files.first() {
1652                                Some(rf) => rf.host.clone(),
1653                                None => {
1654                                    let _ =
1655                                        result_tx.send(Err(anyhow::anyhow!("No URIs provided")));
1656                                    continue;
1657                                }
1658                            };
1659                            if remote_files.iter().any(|rf| rf.host != host) {
1660                                let _ = result_tx.send(Err(anyhow::anyhow!(
1661                                    "All URIs must be from the same host"
1662                                )));
1663                                continue;
1664                            }
1665                            let client = {
1666                                let read = clients.read().await;
1667                                read.get(&host).cloned()
1668                            };
1669                            let client = match client {
1670                                Some(c) => c,
1671                                None => {
1672                                    let err = anyhow::anyhow!("No client for host {}", host);
1673                                    let _ = result_tx.send(Err(err));
1674                                    continue;
1675                                }
1676                            };
1677                            let result = run_tbprofiler(&client, paths, tb_config).await;
1678                            let event_result: Result<SlurmJobId, String> = result
1679                                .as_ref()
1680                                .copied()
1681                                .map_err(|e| e.to_string());
1682                            let _ = result_tx.send(result);
1683                            event_tx
1684                                .send(Event::RunTbProfilerResult(uri, event_result))
1685                                .unwrap();
1686                        }
1687                        Cmd::DeleteRemoteFiles(paths, uris, result_tx) => {
1688                            let uri = uris
1689                                .first()
1690                                .cloned()
1691                                .unwrap_or_else(|| "ssh:///".to_string());
1692                            if uris.is_empty() {
1693                                let err = anyhow::anyhow!("No URIs provided");
1694                                let _ = result_tx.send(Err(err));
1695                                continue;
1696                            }
1697                            let remote_files: Vec<_> = match uris
1698                                .iter()
1699                                .map(|u| u.parse::<RemoteFile>().map_err(|e| anyhow::anyhow!(e)))
1700                                .collect::<Result<_, _>>()
1701                            {
1702                                Ok(v) => v,
1703                                Err(err) => {
1704                                    let _ = result_tx.send(Err(err));
1705                                    continue;
1706                                }
1707                            };
1708                            let host = match remote_files.first() {
1709                                Some(rf) => rf.host.clone(),
1710                                None => {
1711                                    let _ =
1712                                        result_tx.send(Err(anyhow::anyhow!("No URIs provided")));
1713                                    continue;
1714                                }
1715                            };
1716                            if remote_files.iter().any(|rf| rf.host != host) {
1717                                let _ = result_tx.send(Err(anyhow::anyhow!(
1718                                    "All URIs must be from the same host"
1719                                )));
1720                                continue;
1721                            }
1722                            let client = {
1723                                let read = clients.read().await;
1724                                read.get(&host).cloned()
1725                            };
1726                            let client = match client {
1727                                Some(c) => c,
1728                                None => {
1729                                    let err = anyhow::anyhow!("No client for host {}", host);
1730                                    let _ = result_tx.send(Err(err));
1731                                    continue;
1732                                }
1733                            };
1734                            let result = delete_remote_files(&client, &paths).await;
1735                            let event_result: Result<String, String> = result
1736                                .as_ref()
1737                                .map(|s| s.clone())
1738                                .map_err(|e| e.to_string());
1739                            let _ = result_tx.send(result);
1740                            event_tx
1741                                .send(Event::DeleteRemoteFilesResult(uri, event_result))
1742                                .unwrap();
1743                        }
1744                        Cmd::DeleteTbProfilerResults(uri, tb_config, result_tx) => {
1745                            let remote_file =
1746                                match uri.parse::<RemoteFile>().map_err(|e| e.to_string()) {
1747                                    Ok(rf) => rf,
1748                                    Err(e) => {
1749                                        let _ = result_tx.send(Err(anyhow::anyhow!(e)));
1750                                        continue;
1751                                    }
1752                                };
1753                            let host = remote_file.host.as_str();
1754                            let client = {
1755                                let read = clients.read().await;
1756                                match read.get(host) {
1757                                    Some(c) => Arc::clone(c),
1758                                    None => {
1759                                        let msg =
1760                                            format!("No SSH client connected for host: {}", host);
1761                                        let _ = result_tx.send(Err(anyhow::anyhow!(msg)));
1762                                        continue;
1763                                    }
1764                                }
1765                            };
1766                            let result = delete_tbprofiler_results(&client, tb_config).await;
1767                            let _ = result_tx.send(result);
1768                        }
1769                        Cmd::PollJobStatus(array_id, uri) => {
1770                            let remote_file = match uri.parse::<RemoteFile>() {
1771                                Ok(rf) => rf,
1772                                Err(e) => {
1773                                    log::warn!("PollJobStatus: invalid URI {uri}: {e}");
1774                                    continue;
1775                                }
1776                            };
1777                            let client = {
1778                                let read = clients.read().await;
1779                                read.get(remote_file.host.as_str()).cloned()
1780                            };
1781                            let client = match client {
1782                                Some(c) => c,
1783                                None => {
1784                                    log::warn!("PollJobStatus: no client for host {}", remote_file.host);
1785                                    continue;
1786                                }
1787                            };
1788                            let event_tx = event_tx.clone();
1789                            tokio::spawn(async move {
1790                                loop {
1791                                    tokio::time::sleep(std::time::Duration::from_secs(30)).await;
1792                                    let running = match poll_running_tasks(&client, array_id).await {
1793                                        Ok(n) => n,
1794                                        Err(e) => {
1795                                            log::warn!("Failed to poll job {array_id}: {e}");
1796                                            break;
1797                                        }
1798                                    };
1799                                    if event_tx
1800                                        .send(Event::JobStatusUpdate(uri.clone(), array_id, running))
1801                                        .is_err()
1802                                    {
1803                                        log::warn!("PollJobStatus: event_tx send failed for job_id={array_id}, stopping poll loop");
1804                                        break;
1805                                    }
1806                                    if running == 0 {
1807                                        break;
1808                                    }
1809                                }
1810                            });
1811                        }
1812                    }
1813                }
1814            });
1815        });
1816        Self {
1817            command_tx,
1818            event_rx: Arc::new(Mutex::new(event_rx)),
1819        }
1820    }
1821}
1822
1823impl Connector for Russh {
1824    fn items(&self, sizes: IconSizes) -> Option<ClientItems> {
1825        let (items_tx, mut items_rx) = mpsc::channel(1);
1826        self.command_tx.send(Cmd::Items(sizes, items_tx)).unwrap();
1827        items_rx.blocking_recv()
1828    }
1829
1830    fn connect(&self, item: ClientItem) -> Task<()> {
1831        let command_tx = self.command_tx.clone();
1832        Task::perform(
1833            async move {
1834                let (res_tx, res_rx) = tokio::sync::oneshot::channel();
1835
1836                command_tx.send(Cmd::Connect(item, res_tx)).unwrap();
1837                res_rx.await
1838            },
1839            |x| {
1840                if let Err(err) = x {
1841                    log::error!("{err:?}");
1842                }
1843            },
1844        )
1845    }
1846
1847    fn remote_drive(&self, uri: String) -> Task<()> {
1848        let command_tx = self.command_tx.clone();
1849        Task::perform(
1850            async move {
1851                let (res_tx, res_rx) = tokio::sync::oneshot::channel();
1852
1853                command_tx.send(Cmd::RemoteDrive(uri, res_tx)).unwrap();
1854                res_rx.await
1855            },
1856            |x| {
1857                if let Err(err) = x {
1858                    log::error!("{err:?}");
1859                }
1860            },
1861        )
1862    }
1863
1864    fn download_file(&self, paths: Box<[PathBuf]>, uris: Vec<String>, to: PathBuf, zip_output: Option<PathBuf>) -> cosmic::Task<super::DownloadEvent> {
1865        let command_tx = self.command_tx.clone();
1866        cosmic::Task::stream(cosmic::iced::stream::channel(
1867            16,
1868            move |mut event_tx: cosmic::iced::futures::channel::mpsc::Sender<super::DownloadEvent>| async move {
1869                use cosmic::iced::futures::SinkExt;
1870                let (res_tx, mut res_rx) = tokio::sync::oneshot::channel();
1871                let (progress_tx, mut progress_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1872
1873                command_tx
1874                    .send(Cmd::Download(paths, uris, to, zip_output, res_tx, progress_tx))
1875                    .unwrap();
1876
1877                let result = loop {
1878                    tokio::select! {
1879                        Some(()) = progress_rx.recv() => {
1880                            let _ = event_tx.send(super::DownloadEvent::FileCompleted).await;
1881                        }
1882                        result = &mut res_rx => {
1883                            while let Ok(()) = progress_rx.try_recv() {
1884                                let _ = event_tx.send(super::DownloadEvent::FileCompleted).await;
1885                            }
1886                            break result;
1887                        }
1888                    }
1889                };
1890
1891                let result = result
1892                    .unwrap_or_else(|_| Err(anyhow::anyhow!("channel closed")))
1893                    .map_err(|e| e.to_string());
1894                let _ = event_tx.send(super::DownloadEvent::Complete(result)).await;
1895            },
1896        ))
1897    }
1898
1899    fn remote_scan(
1900        &self,
1901        uri: &str,
1902        sizes: IconSizes,
1903    ) -> Option<Result<Vec<tab::Item>, String>> {
1904        let (items_tx, mut items_rx) = mpsc::channel(1);
1905
1906        if let Err(e) = self.command_tx.send(Cmd::RemoteScan(
1907            uri.to_string(),
1908            sizes,
1909            items_tx,
1910        )) {
1911            log::error!(
1912                "remote_scan: failed to send Cmd::RemoteScan for uri {}: {}",
1913                uri,
1914                e
1915            );
1916            return Some(Err("command channel closed".into()));
1917        }
1918
1919        items_rx.blocking_recv()
1920    }
1921
1922    fn remote_parent_item(&self, uri: &str, sizes: IconSizes) -> Option<Result<tab::Item, String>> {
1923        let (items_tx, mut items_rx) = mpsc::channel(1);
1924
1925        if let Err(e) = self
1926            .command_tx
1927            .send(Cmd::RemoteParent(uri.to_string(), sizes, items_tx))
1928        {
1929            log::error!(
1930                "remote_parent: failed to send Cmd::RemoteParent for uri {}: {}",
1931                uri,
1932                e
1933            );
1934            return Some(Err("command channel closed".into()));
1935        }
1936
1937        items_rx.blocking_recv()
1938    }
1939
1940    fn dir_info(&self, uri: &str) -> Option<(String, String, Option<PathBuf>)> {
1941        let (result_tx, mut result_rx) = mpsc::channel(1);
1942        self.command_tx
1943            .send(Cmd::DirInfo(uri.to_string(), result_tx))
1944            .unwrap();
1945        result_rx.blocking_recv().and_then(|res| res.ok())
1946    }
1947
1948    fn disconnect(&self, item: ClientItem) -> Task<()> {
1949        let command_tx = self.command_tx.clone();
1950        Task::future(async move {
1951            command_tx.send(Cmd::Disconnect(item)).unwrap();
1952        })
1953    }
1954
1955    fn run_tb_profiler(
1956        &self,
1957        paths: Box<[PathBuf]>,
1958        uris: Vec<String>,
1959        tb_config: TBConfig,
1960    ) -> Task<()> {
1961        let command_tx = self.command_tx.clone();
1962        Task::perform(
1963            async move {
1964                let (res_tx, res_rx) = tokio::sync::oneshot::channel();
1965
1966                command_tx
1967                    .send(Cmd::RunTbProfiler(paths, uris, tb_config, res_tx))
1968                    .unwrap();
1969
1970                res_rx.await
1971            },
1972            |x| match x {
1973                Ok(Ok(job)) => log::info!("TBProfiler started: job_id={}, tasks={}", job.array_id, job.tasks),
1974                Ok(Err(err)) => log::error!("TBProfiler failed: {err}"),
1975                Err(err) => log::error!("Channel error: {err}"),
1976            },
1977        )
1978    }
1979
1980    fn delete_remote_files(&self, paths: Box<[PathBuf]>, uris: Vec<String>) -> Task<()> {
1981        let command_tx = self.command_tx.clone();
1982        Task::perform(
1983            async move {
1984                let (res_tx, res_rx) = tokio::sync::oneshot::channel();
1985
1986                command_tx
1987                    .send(Cmd::DeleteRemoteFiles(paths, uris, res_tx))
1988                    .unwrap();
1989
1990                res_rx.await
1991            },
1992            |x| match x {
1993                Ok(Ok(msg)) => log::info!("Remote files deleted: {msg}"),
1994                Ok(Err(err)) => log::error!("Remote file deletion failed: {err}"),
1995                Err(err) => log::error!("Channel error: {err}"),
1996            },
1997        )
1998    }
1999
2000    fn poll_job_status(&self, job_id: usize, uri: String) -> Task<()> {
2001        let command_tx = self.command_tx.clone();
2002        Task::future(async move {
2003            command_tx.send(Cmd::PollJobStatus(job_id, uri)).unwrap();
2004        })
2005    }
2006
2007    fn delete_tb_profiler_results(&self, uri: String, tb_config: TBConfig) -> Task<()> {
2008        let command_tx = self.command_tx.clone();
2009        Task::perform(
2010            async move {
2011                let (res_tx, res_rx) = tokio::sync::oneshot::channel();
2012
2013                command_tx
2014                    .send(Cmd::DeleteTbProfilerResults(uri, tb_config, res_tx))
2015                    .unwrap();
2016
2017                res_rx.await
2018            },
2019            |x| match x {
2020                Ok(Ok(msg)) => log::info!("TBProfiler results deleted: {msg}"),
2021                Ok(Err(err)) => log::error!("TBProfiler deletion failed: {err}"),
2022                Err(err) => log::error!("Channel error: {err}"),
2023            },
2024        )
2025    }
2026
2027    fn subscription(&self) -> Subscription<ClientMessage> {
2028        let command_tx = self.command_tx.clone();
2029        let event_rx = self.event_rx.clone();
2030        struct Wrapper {
2031            command_tx: mpsc::UnboundedSender<Cmd>,
2032            event_rx: Arc<Mutex<mpsc::UnboundedReceiver<Event>>>,
2033        }
2034        impl Hash for Wrapper {
2035            fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
2036                TypeId::of::<Self>().hash(state);
2037            }
2038        }
2039        Subscription::run_with(
2040            Wrapper {
2041                command_tx,
2042                event_rx,
2043            },
2044            |Wrapper {
2045                 command_tx,
2046                 event_rx,
2047             }| {
2048                let command_tx = command_tx.clone();
2049                let event_rx = event_rx.clone();
2050                stream::channel(
2051                    1,
2052                    move |mut output: cosmic::iced::futures::channel::mpsc::Sender<
2053                        ClientMessage,
2054                    >| async move {
2055                        command_tx.send(Cmd::Rescan).unwrap();
2056                        while let Some(event) = event_rx.lock().await.recv().await {
2057                            match event {
2058                                Event::Changed => command_tx.send(Cmd::Rescan).unwrap(),
2059                                Event::Items(items) => {
2060                                    output.send(ClientMessage::Items(items)).await.unwrap()
2061                                }
2062                                Event::ClientResult(item, res) => output
2063                                    .send(ClientMessage::ClientResult(item, res))
2064                                    .await
2065                                    .unwrap(),
2066                                Event::RemoteAuth(uri, auth, auth_tx) => output
2067                                    .send(ClientMessage::RemoteAuth(uri, auth, auth_tx))
2068                                    .await
2069                                    .unwrap(),
2070                                Event::RemoteResult(uri, res) => output
2071                                    .send(ClientMessage::RemoteResult(uri, res))
2072                                    .await
2073                                    .unwrap(),
2074                                Event::RunTbProfilerResult(uri, res) => output
2075                                    .send(ClientMessage::RunTbProfilerResult(uri, res))
2076                                    .await
2077                                    .unwrap(),
2078                                Event::DeleteRemoteFilesResult(uri, res) => output
2079                                    .send(ClientMessage::DeleteRemoteFilesResult(uri, res))
2080                                    .await
2081                                    .unwrap(),
2082                                Event::JobStatusUpdate(uri, array_id, running_tasks) => output
2083                                    .send(ClientMessage::JobStatusUpdate(uri, array_id, running_tasks))
2084                                    .await
2085                                    .unwrap(),
2086                            }
2087                        }
2088                        pending().await
2089                    },
2090                )
2091            },
2092        )
2093    }
2094}