1use async_ssh2_tokio::client::{AuthMethod, Client, ServerCheckMethod};
2use russh_sftp::client::SftpSession;
3use russh_sftp::protocol::FileType;
4use url::Url;
5
6use cosmic::{
7 Task,
8 iced::{Subscription, futures::SinkExt, stream},
9 widget,
10};
11use std::{
12 any::TypeId,
13 cell::Cell,
14 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
15 fmt,
16 future::pending,
17 hash::Hash,
18 path::{Path, PathBuf},
19 str::FromStr,
20 sync::Arc,
21};
22use tokio::sync::{Mutex, RwLock, mpsc};
23
24use super::{ClientAuth, ClientItem, ClientItems, ClientMessage, Connector};
25
26use crate::{
27 config::IconSizes,
28 fl,
29 sequencing::tb_data::TbProfilerJson,
30 tab::{self, DirSize, ItemMetadata, ItemThumbnail, Location},
31};
32use mime_guess::MimeGuess;
33use tokio::io::AsyncReadExt;
34use tokio::runtime::Builder;
35
36use crate::config::TBConfig;
37
38#[derive(Debug, Clone, PartialEq, Eq, Hash)]
43struct SampleFiles {
44 json: Option<PathBuf>,
46 csv: Option<PathBuf>,
48 docx: Option<PathBuf>,
50 mtime: u64,
51 size: Option<u64>,
52}
53
54use super::SlurmJobId;
55
56fn get_key_files() -> Result<(PathBuf, PathBuf), String> {
57 let home_dir = dirs::home_dir().ok_or_else(|| {
58 "Could not determine the user home directory.\n\
59 This usually indicates that the HOME environment variable is missing or invalid.\n\
60 Please ensure HOME is set correctly before using SSH connections."
61 .to_string()
62 })?;
63
64 let ssh_dir = home_dir.join(".ssh");
65 if !ssh_dir.is_dir() {
66 return Err(format!(
67 "SSH configuration directory not found: {}.\n\
68 Please ensure ~/.ssh exists and contains your SSH keys.",
69 ssh_dir.display()
70 ));
71 }
72 if ssh_dir.join("id_rsa").is_file() {
73 Ok((ssh_dir.join("id_rsa"), ssh_dir.join("id_rsa.pub")))
74 } else if ssh_dir.join("id_ed25519").is_file() {
75 Ok((ssh_dir.join("id_ed25519"), ssh_dir.join("id_ed25519.pub")))
76 } else if ssh_dir.join("id_ecdsa").is_file() {
77 Ok((ssh_dir.join("id_ecdsa"), ssh_dir.join("id_ecdsa.pub")))
78 } else if ssh_dir.join("id_dsa").is_file() {
79 Ok((ssh_dir.join("id_dsa"), ssh_dir.join("id_dsa.pub")))
80 } else {
81 Err(format!(
82 "No SSH key pair found in {}.\n\
83 Expected one of: id_rsa, id_ed25519, id_ecdsa, id_dsa.\n\
84 Please generate a key (e.g., with `ssh-keygen -t ed25519`) and try again.",
85 ssh_dir.display()
86 ))
87 }
88}
89
90pub async fn dir_info(
91 client: &Client,
92 uri: &str,
93) -> Result<(String, String, Option<PathBuf>), String> {
94 let remote_file = uri.parse::<RemoteFile>().map_err(|e| e.to_string())?;
95 let resolved_uri = remote_file.uri();
96
97 let channel = client.get_channel().await.map_err(|e| e.to_string())?;
98 channel
99 .request_subsystem(true, "sftp")
100 .await
101 .map_err(|e| e.to_string())?;
102 let sftp = SftpSession::new(channel.into_stream())
103 .await
104 .map_err(|e| e.to_string())?;
105
106 if let Err(err) = sftp.metadata(&remote_file.path).await {
107 return Err(format!(
108 "dir_info(): metadata error for {}: {}",
109 remote_file.path, err
110 ));
111 }
112
113 let display_name = Path::new(&remote_file.path)
115 .file_name()
116 .and_then(|s| s.to_str())
117 .unwrap_or(&remote_file.path)
118 .to_string();
119 let filepath: Option<PathBuf> = Some(PathBuf::from(&remote_file.path));
120
121 Ok((resolved_uri, display_name, filepath))
122}
123
124pub async fn resolve_symlink(
125 client: &Client,
126 remotefile: &RemoteFile,
127) -> Result<RemoteFile, String> {
128 let channel = client.get_channel().await.map_err(|e| e.to_string())?;
129 channel
130 .request_subsystem(true, "sftp")
131 .await
132 .map_err(|e| e.to_string())?;
133 let sftp = SftpSession::new(channel.into_stream())
134 .await
135 .map_err(|e| e.to_string())?;
136
137 match sftp.read_link(&remotefile.path).await {
138 Ok(link_path) => {
139 let new_uri = format!(
141 "ssh://{}{}:{}{}",
142 remotefile
143 .username
144 .clone()
145 .map(|u| u + "@")
146 .unwrap_or_default(),
147 remotefile.host,
148 remotefile.port,
149 link_path
150 );
151 new_uri.parse::<RemoteFile>().map_err(|e| e.to_string())
152 }
153 Err(_) => Ok(remotefile.clone()), }
155}
156
157async fn remote_sftp_list(
158 client: &Client,
159 uri: &str,
160 sizes: IconSizes,
161) -> Result<Vec<tab::Item>, String> {
162 let mut remote_file = uri.parse::<RemoteFile>().map_err(|e| e.to_string())?;
163 let force_dir = uri.starts_with("ssh:///");
164 let path = remote_file.path.clone();
165 let channel = client.get_channel().await.map_err(|e| e.to_string())?;
166 channel
167 .request_subsystem(true, "sftp")
168 .await
169 .map_err(|e| e.to_string())?;
170 let sftp = SftpSession::new(channel.into_stream())
171 .await
172 .map_err(|e| e.to_string())?;
173 let entries = sftp
174 .read_dir(path.clone())
175 .await
176 .map_err(|e| format!("read_dir {path}: {e:?}"))?;
177
178 let mut items = Vec::new();
179 let mut samples: HashMap<String, SampleFiles> = HashMap::new();
180
181 for entry in entries {
185 let child_path = PathBuf::from(entry.file_name());
186 let file_type = entry.file_type();
187 let info = entry.metadata();
188 if info.is_symlink() {
189 remote_file = resolve_symlink(client, &remote_file).await?;
190 }
191 let name = child_path
192 .file_name()
193 .and_then(|s| s.to_str())
194 .unwrap_or("")
195 .to_string();
196 let new_path = PathBuf::from(&path).join(&child_path);
197 let mut url = Url::parse(&format!(
198 "ssh://{}{}:{}",
199 remote_file
200 .username
201 .clone()
202 .map(|u| u + "@")
203 .unwrap_or_default(),
204 remote_file.host,
205 remote_file.port,
206 ))
207 .unwrap();
208 let url_path = format!("{}/{}", path.trim_end_matches('/'), name);
209 url.set_path(&url_path);
210 let child_uri = url.to_string();
211 let location = Location::Remote(child_uri.clone(), name.clone(), Some(new_path.clone()));
212
213 let mut is_tbprofiler_groupable_raw_result_file = false;
217 if file_type == FileType::File
218 && let Some((sample_id, suffix)) = name.split_once(".results.") {
219 let entry = samples.entry(sample_id.to_string()).or_insert(SampleFiles {
220 json: None,
221 csv: None,
222 docx: None,
223 mtime: info
224 .modified()
225 .ok()
226 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
227 .map(|d| d.as_secs())
228 .unwrap_or(0),
229 size: None,
230 });
231
232 match suffix {
233 "json" => entry.json = Some(new_path.clone()),
234 "csv" => entry.csv = Some(new_path.clone()),
235 "docx" => entry.docx = Some(new_path.clone()),
236 _ => {}
237 }
238
239 is_tbprofiler_groupable_raw_result_file = true;
240 }
241
242 let metadata = if !force_dir {
243 let mtime = info
244 .modified()
245 .ok()
246 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
247 .map(|d| d.as_secs())
248 .unwrap_or(0);
249 let is_dir = info.is_dir();
250 let size_opt = (!is_dir).then_some(info.size).flatten();
251 let mut children_opt = None;
252 let is_tbprofiler_json =
253 MimeGuess::from_path(&new_path).first_or_octet_stream() == mime::APPLICATION_JSON;
254 let _is_ab1 = new_path.extension().map(|e| e.eq_ignore_ascii_case("ab1")).unwrap_or(false);
255 let mut tbprofilerjson_opt = None;
256 let mut is_susceptible = None;
257 if is_tbprofiler_json {
258 match load_remote_json(client, &child_uri).await {
259 Ok(json) => {
260 let sus = json.dr_variants.iter().all(|v| v.is_susceptible());
261 tbprofilerjson_opt = Some(json);
262 is_susceptible = Some(sus);
263 }
264 Err(e) => {
265 log::info!("Failed to load JSON for {}: {}", new_path.display(), e);
266 }
267 }
268 }
269 if is_dir {
270 let mut count = 0;
271 match sftp.read_dir(new_path.to_string_lossy().to_string()).await {
272 Ok(mut dir) => {
273 for entry in dir.by_ref() {
274 if entry.file_name() == "." || entry.file_name() == ".." {
275 continue;
276 }
277 count += 1;
278 }
279 }
280 Err(_) => {
281 log::info!(
282 "Could not read directory to count children: {}",
283 new_path.display()
284 );
285 }
286 }
287 children_opt = Some(count);
288 }
289 let is_tbprofiler_result_as_sample = tbprofilerjson_opt.is_some();
290 ItemMetadata::RusshPath {
291 mtime,
292 size_opt,
293 children_opt,
294 is_tbprofiler_json,
295 tbprofilerjson_opt,
296 is_tbprofiler_result_as_sample,
297 is_tbprofiler_groupable_raw_result_file,
298 sample_json_path_opt: None,
299 sample_csv_path_opt: None,
300 sample_docx_path_opt: None,
301 is_susceptible,
302 }
303 } else {
304 ItemMetadata::SimpleDir { entries: 0 }
305 };
306
307 let (mime, icon_handle_grid, icon_handle_list, icon_handle_list_condensed) = {
308 let mime = if metadata.is_dir() {
309 "inode/directory".parse().unwrap()
310 } else {
311 MimeGuess::from_path(&new_path).first_or_octet_stream()
312 };
313 let mime_clone = mime.clone();
314 let file_icon = |size| {
315 widget::icon::from_name(if metadata.is_dir() {
316 "folder"
317 } else if mime_clone.type_() == mime::IMAGE {
318 "image-x-generic"
319 } else if mime_clone == mime::APPLICATION_PDF {
320 "application-pdf"
321 } else {
322 "text-x-generic"
323 })
324 .size(size)
325 .handle()
326 };
327 (
328 mime,
330 file_icon(sizes.grid()),
331 file_icon(sizes.list()),
332 file_icon(sizes.list_condensed()),
333 )
334 };
335
336 let hidden = name.starts_with('.');
338
339 items.push(tab::Item {
340 name: name.clone(),
341 is_mount_point: false,
342 is_client_point: false,
343 display_name: name.clone(),
344 metadata,
345 hidden,
346 location_opt: Some(location),
347 image_dimensions: None,
348 mime,
349 icon_handle_grid,
350 icon_handle_list,
351 icon_handle_list_condensed,
352 thumbnail_opt: Some(ItemThumbnail::NotImage),
353 button_id: widget::Id::unique(),
354 pos_opt: Cell::new(None),
355 rect_opt: Cell::new(None),
356 selected: false,
357 highlighted: false,
358 overlaps_drag_rect: false,
359 dir_size: DirSize::NotDirectory,
360 cut: false,
361 });
362 }
363
364 let sample_susceptibility: HashMap<String, bool> = items
366 .iter()
367 .filter_map(|item| {
368 if let ItemMetadata::RusshPath { is_tbprofiler_groupable_raw_result_file: true, is_susceptible: Some(true), .. } =
369 &item.metadata
370 {
371 let id = item.name.find('.').map(|i| item.name[..i].to_string())?;
372 Some((id, true))
373 } else {
374 None
375 }
376 })
377 .collect();
378
379 for item in &mut items {
382 let name = item.name.clone();
383 if let ItemMetadata::RusshPath { is_tbprofiler_groupable_raw_result_file, is_susceptible, .. } =
384 &mut item.metadata
385 && *is_tbprofiler_groupable_raw_result_file {
386 let sample_id = name.find('.').map(|i| &name[..i]);
387 if sample_id.is_none_or(|id| samples.get(id).is_none_or(|f| f.json.is_none()))
388 {
389 *is_tbprofiler_groupable_raw_result_file = false;
390 } else if let Some(id) = sample_id
391 && let Some(&sus) = sample_susceptibility.get(id) {
392 *is_susceptible = Some(sus);
393 }
394 }
395 }
396
397 for (sample_id, files) in samples {
401 if files.json.is_none() {
402 continue;
403 }
404 let mut tbprofilerjson_opt = None;
405 let mut is_susceptible = None;
406
407 if let Some(json_path) = &files.json {
408 let mut url = Url::parse(&format!(
409 "ssh://{}{}:{}",
410 remote_file
411 .username
412 .clone()
413 .map(|u| u + "@")
414 .unwrap_or_default(),
415 remote_file.host,
416 remote_file.port,
417 ))
418 .unwrap();
419
420 let url_path = json_path.to_string_lossy().replace('\\', "/");
421 url.set_path(&url_path);
422 match load_remote_json(client, url.as_str()).await {
423 Ok(json) => {
424 let sus = json.dr_variants.iter().all(|v| v.is_susceptible());
425 tbprofilerjson_opt = Some(json);
426 is_susceptible = Some(sus);
427 }
428 Err(e) => log::warn!("Failed to load sample JSON: {}", e),
429 }
430 }
431
432 let location = Location::Remote(uri.to_string(), sample_id.clone(), None);
433
434 let metadata = ItemMetadata::RusshPath {
435 mtime: files.mtime,
436 size_opt: None,
437 children_opt: None,
438 is_tbprofiler_json: true,
439 tbprofilerjson_opt,
440 is_tbprofiler_result_as_sample: true,
441 is_tbprofiler_groupable_raw_result_file: false,
442 sample_json_path_opt: files.json.clone(),
443 sample_csv_path_opt: files.csv.clone(),
444 sample_docx_path_opt: files.docx.clone(),
445 is_susceptible,
446 };
447
448 items.push(tab::Item {
449 name: sample_id.clone(),
450 display_name: sample_id.clone(),
451 metadata,
452 hidden: false,
453 location_opt: Some(location),
454 image_dimensions: None,
455 mime: "application/json".parse().unwrap(),
456 icon_handle_grid: widget::icon::from_name("text-x-generic")
457 .size(sizes.grid())
458 .handle(),
459 icon_handle_list: widget::icon::from_name("text-x-generic")
460 .size(sizes.list())
461 .handle(),
462 icon_handle_list_condensed: widget::icon::from_name("text-x-generic")
463 .size(sizes.list_condensed())
464 .handle(),
465 is_mount_point: false,
466 is_client_point: false,
467 thumbnail_opt: Some(ItemThumbnail::NotImage),
468 button_id: widget::Id::unique(),
469 pos_opt: Cell::new(None),
470 rect_opt: Cell::new(None),
471 selected: false,
472 highlighted: false,
473 overlaps_drag_rect: false,
474 dir_size: DirSize::NotDirectory,
475 cut: false,
476 });
477 }
478
479 Ok(items)
480}
481
482async fn remote_sftp_parent(
483 client: &Client,
484 uri: &str,
485 sizes: IconSizes,
486) -> Result<tab::Item, String> {
487 let remote_file = uri.parse::<RemoteFile>().map_err(|e| e.to_string())?;
488 let path = remote_file.path.clone();
489 let channel = client.get_channel().await.map_err(|e| e.to_string())?;
490 channel
491 .request_subsystem(true, "sftp")
492 .await
493 .map_err(|e| e.to_string())?;
494 let sftp = SftpSession::new(channel.into_stream())
495 .await
496 .map_err(|e| e.to_string())?;
497 let metadata = sftp
498 .metadata(path.clone())
499 .await
500 .map_err(|e| format!("metadata {path}: {e:?}"))?;
501
502 let child_path = PathBuf::from(remote_file.path.clone());
503 let name = child_path
504 .file_name()
505 .and_then(|s| s.to_str())
506 .unwrap_or("")
507 .to_string();
508 let child_uri = format!(
509 "ssh://{}{}:{}{}",
510 remote_file
511 .username
512 .clone()
513 .map(|u| u + "@")
514 .unwrap_or_default(),
515 remote_file.host,
516 remote_file.port,
517 remote_file.path.clone(),
518 );
519
520 let location = Location::Remote(
521 child_uri.clone(),
522 name.clone(),
523 Some(PathBuf::from(remote_file.path.clone())),
524 );
525
526 let item_metadata = {
527 let mtime = metadata
528 .modified()
529 .ok()
530 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
531 .map(|d| d.as_secs())
532 .unwrap_or(0);
533 let is_dir = metadata.is_dir();
534 let size_opt = (!is_dir).then_some(metadata.size).flatten();
535 let mut children_opt = None;
536 if is_dir {
537 let mut count = 0;
538 match sftp.read_dir(remote_file.path.clone()).await {
539 Ok(mut dir) => {
540 for entry in dir.by_ref() {
541 if entry.file_name() == "." || entry.file_name() == ".." {
542 continue;
543 }
544 count += 1;
545 }
546 }
547 Err(_) => {
548 log::info!(
549 "Could not read directory to count children: {}",
550 remote_file.path.clone()
551 );
552 }
553 }
554 children_opt = Some(count);
555 }
556 ItemMetadata::RusshPath {
557 mtime,
558 size_opt,
559 children_opt,
560 is_tbprofiler_json: false,
561 tbprofilerjson_opt: None,
562 is_tbprofiler_result_as_sample: false,
563 is_tbprofiler_groupable_raw_result_file: false,
564 sample_json_path_opt: None,
565 sample_csv_path_opt: None,
566 sample_docx_path_opt: None,
567 is_susceptible: None,
568 }
569 };
570 let (mime, icon_handle_grid, icon_handle_list, icon_handle_list_condensed) = {
571 let file_icon = |size| {
572 widget::icon::from_name(if metadata.is_dir() {
573 "folder"
574 } else {
575 "text-x-generic"
576 })
577 .size(size)
578 .handle()
579 };
580 (
581 "inode/directory".parse().unwrap(),
582 file_icon(sizes.grid()),
583 file_icon(sizes.list()),
584 file_icon(sizes.list_condensed()),
585 )
586 };
587 let hidden = name.starts_with('.');
588 let item = tab::Item {
589 name: name.clone(),
590 is_mount_point: false,
591 is_client_point: false,
592 display_name: name,
593 metadata: item_metadata,
594 hidden,
595 location_opt: Some(location),
596 image_dimensions: None,
597 mime,
598 icon_handle_grid,
599 icon_handle_list,
600 icon_handle_list_condensed,
601 thumbnail_opt: Some(ItemThumbnail::NotImage),
602 button_id: widget::Id::unique(),
603 pos_opt: Cell::new(None),
604 rect_opt: Cell::new(None),
605 selected: false,
606 highlighted: false,
607 overlaps_drag_rect: false,
608 dir_size: DirSize::NotDirectory,
609 cut: false,
610 };
611 Ok(item)
612}
613
614async fn load_remote_json(client: &Client, uri: &str) -> Result<TbProfilerJson, String> {
615 let remote_file = uri.parse::<RemoteFile>().map_err(|e| e.to_string())?;
616 let channel = client.get_channel().await.map_err(|e| e.to_string())?;
617 channel
618 .request_subsystem(true, "sftp")
619 .await
620 .map_err(|e| e.to_string())?;
621 let sftp = SftpSession::new(channel.into_stream())
622 .await
623 .map_err(|e| e.to_string())?;
624 let mut file = sftp
625 .open(remote_file.path.clone())
626 .await
627 .map_err(|e| e.to_string())?;
628 let mut contents = String::new();
629 file.read_to_string(&mut contents)
630 .await
631 .map_err(|e| e.to_string())?;
632
633 let parsed: TbProfilerJson = serde_json::from_str(&contents).map_err(|e| e.to_string())?;
634 Ok(parsed)
635}
636
637async fn perform_download(
638 client: &Client,
639 paths: Box<[PathBuf]>,
640 to: PathBuf,
641 zip_output: Option<PathBuf>,
642 progress_tx: &tokio::sync::mpsc::UnboundedSender<()>,
643) -> Result<(), anyhow::Error> {
644 let mut reserved = HashSet::new();
645
646 let sq = |s: &str| s.replace('\'', "'\\''");
648
649 let channel = client.get_channel().await?;
651 channel.request_subsystem(true, "sftp").await?;
652 let sftp = SftpSession::new(channel.into_stream()).await?;
653
654 let mut dirs: Vec<&PathBuf> = Vec::new();
656 let mut files: Vec<&PathBuf> = Vec::new();
657 for from in paths.iter() {
658 if from.file_name().is_none() {
659 continue;
660 }
661 let remote = from.to_string_lossy().replace('\\', "/");
662 let is_dir = sftp
663 .metadata(&remote)
664 .await
665 .map(|m| m.is_dir())
666 .unwrap_or(false);
667 if is_dir {
668 dirs.push(from);
669 } else {
670 files.push(from);
671 }
672 }
673
674 if !dirs.is_empty() {
676 let remote_zip = format!(
677 "/tmp/cosmic_files_{}.zip",
678 std::time::SystemTime::now()
679 .duration_since(std::time::UNIX_EPOCH)
680 .unwrap_or_default()
681 .as_millis()
682 );
683
684 let parts: Vec<String> = dirs
687 .iter()
688 .map(|dir| {
689 let parent = dir
690 .parent()
691 .unwrap_or_else(|| Path::new("/"))
692 .to_string_lossy()
693 .replace('\\', "/");
694 let dir_name = dir.file_name().unwrap().to_string_lossy();
695 format!(
696 "(cd '{}' && zip -r '{}' '{}')",
697 sq(&parent),
698 sq(&remote_zip),
699 sq(&dir_name),
700 )
701 })
702 .collect();
703 let zip_cmd = parts.join(" && ");
704
705 let res = client.execute(&zip_cmd).await?;
706 if res.exit_status != 0 {
707 let _ = client
708 .execute(&format!("rm -f '{}'", sq(&remote_zip)))
709 .await;
710 return Err(anyhow::anyhow!(
711 "zip failed: stdout: {} stderr: {}",
712 res.stdout,
713 res.stderr
714 ));
715 }
716
717 let local_target = if let Some(ref out) = zip_output {
719 out.clone()
720 } else {
721 let zip_name = if dirs.len() == 1 {
722 format!("{}.zip", dirs[0].file_name().unwrap().to_string_lossy())
723 } else {
724 "folders.zip".to_string()
725 };
726 let zip_stem_path = PathBuf::from(&zip_name);
727 let mut t = to.join(&zip_name);
728 if t.try_exists().unwrap_or(false) || reserved.contains(&t) {
729 t = download_unique_path(&zip_stem_path, &to, &mut reserved);
730 }
731 t
732 };
733 reserved.insert(local_target.clone());
734
735 let dl_result = client
736 .download_file(remote_zip.clone(), local_target.clone())
737 .await;
738 let _ = client
739 .execute(&format!("rm -f '{}'", sq(&remote_zip)))
740 .await;
741 if let Err(err) = dl_result {
742 return Err(anyhow::anyhow!("Download failed for zip: {}", err));
743 }
744 for _ in dirs.iter() {
746 let _ = progress_tx.send(());
747 }
748 }
749
750 for from in files {
752 let name = from.file_name().unwrap();
753 let remote = from.to_string_lossy().replace('\\', "/");
754 let mut target = to.join(name);
755 if target.try_exists().unwrap_or(false) || reserved.contains(&target) {
756 target = download_unique_path(from, &to, &mut reserved);
757 }
758 reserved.insert(target.clone());
759
760 if let Err(err) = client.download_file(remote.clone(), target.clone()).await {
761 return Err(anyhow::anyhow!("Download failed for {}: {}", remote, err));
762 }
763 let _ = progress_tx.send(());
764 }
765
766 Ok(())
767}
768
769pub fn download_unique_path(from: &Path, to: &Path, reserved: &mut HashSet<PathBuf>) -> PathBuf {
770 const COMPOUND_EXTENSIONS: &[&str] = &[
772 ".tar.gz",
773 ".tar.bz2",
774 ".tar.xz",
775 ".tar.zst",
776 ".tar.lz",
777 ".tar.lzma",
778 ".tar.sz",
779 ".tar.lzo",
780 ".tar.br",
781 ".tar.Z",
782 ".tar.pz",
783 ];
784
785 let to = to.to_owned();
786 let file_name = from.file_name().and_then(|n| n.to_str()).unwrap();
787 let file_name = file_name.to_string();
788
789 let (stem, ext) = COMPOUND_EXTENSIONS
791 .iter()
792 .copied()
793 .find(|&e| file_name.ends_with(e))
794 .map(|e| {
795 (
796 file_name.strip_suffix(e).unwrap().to_string(),
797 Some(e[1..].to_string()),
798 )
799 })
800 .unwrap_or_else(|| {
801 from.file_stem()
802 .and_then(|s| s.to_str())
803 .map_or((file_name.clone(), None), |s| {
804 (
805 s.to_string(),
806 from.extension()
807 .and_then(|e| e.to_str())
808 .map(str::to_string),
809 )
810 })
811 });
812
813 for n in 0.. {
815 let new_name = if n == 0 {
816 file_name.clone()
817 } else {
818 match &ext {
819 Some(ext) => format!("{} ({} {}).{}", stem, fl!("copy_noun"), n, ext),
820 None => format!("{} ({} {})", stem, fl!("copy_noun"), n),
821 }
822 };
823
824 let candidate = to.join(new_name);
825
826 if !candidate.exists() && !reserved.contains(&candidate) {
828 reserved.insert(candidate.clone());
829 return candidate;
830 }
831 }
832
833 unreachable!()
834}
835
836pub async fn run_tbprofiler(
837 client: &Client,
838 paths: Box<[PathBuf]>,
839 tb_config: TBConfig,
840) -> Result<SlurmJobId, anyhow::Error> {
841 if tb_config.script_path.is_empty()
842 || tb_config.out_dir.is_empty()
843 || tb_config.docx_template_path.is_empty()
844 {
845 return Err(anyhow::anyhow!(
846 "Please configure paths under TB profiler settings..."
847 ));
848 }
849 let mut sample_map: BTreeMap<String, BTreeSet<u8>> = BTreeMap::new();
850 for path in paths.iter() {
851 let filename = path
852 .file_name()
853 .and_then(|n| n.to_str())
854 .ok_or_else(|| anyhow::anyhow!("Invalid filename in path: {:?}", path))?;
855 if let Some(sample) = filename.strip_suffix(tb_config.pair1_suffix.as_str()) {
856 sample_map.entry(sample.to_string()).or_default().insert(1);
857 } else if let Some(sample) = filename.strip_suffix(tb_config.pair2_suffix.as_str()) {
858 sample_map.entry(sample.to_string()).or_default().insert(2);
859 } else {
860 return Err(anyhow::anyhow!(
861 "Unexpected FASTQ filename format: {}",
862 filename
863 ));
864 }
865 }
866 if sample_map.is_empty() {
867 return Err(anyhow::anyhow!("No valid FASTQ files provided"));
868 }
869 for (sample, reads) in &sample_map {
870 if reads.len() != 2 {
871 return Err(anyhow::anyhow!(
872 "Sample {} does not have both {} and {} FASTQ files",
873 sample,
874 tb_config.pair1_suffix,
875 tb_config.pair2_suffix
876 ));
877 }
878 }
879 let sample_ids: Vec<String> = sample_map.keys().cloned().collect();
880 let sample_ids_string = sample_ids.join(",");
881 let array_end = sample_ids
882 .len()
883 .checked_sub(1)
884 .ok_or_else(|| anyhow::anyhow!("Need at least 1 sample"))?;
885
886 let raw_sequence_dir_paired = {
887 let first_parent = paths
888 .first()
889 .and_then(|p| p.parent())
890 .ok_or_else(|| anyhow::anyhow!("Could not determine parent directory"))?;
891
892 for p in paths.iter() {
893 let parent = p
894 .parent()
895 .ok_or_else(|| anyhow::anyhow!("Path has no parent: {:?}", p))?;
896 if parent != first_parent {
897 return Err(anyhow::anyhow!(
898 "FASTQ files are in different directories: {:?} vs {:?}",
899 first_parent,
900 parent
901 ));
902 }
903 }
904 first_parent.to_string_lossy().into_owned()
905 };
906
907 let command_run_tbprofiler = format!(
908 "sbatch --parsable --array 0-{} {} \"{}\" {} {} {}",
909 array_end,
910 tb_config.script_path,
911 sample_ids_string,
912 raw_sequence_dir_paired,
913 tb_config.out_dir,
914 tb_config.docx_template_path,
915 );
916 let res = client.execute(&command_run_tbprofiler).await?;
917 if res.exit_status != 0 {
918 return Err(anyhow::anyhow!(
919 "tbprofiler failed (exit {}):\nstdout:\n{}\nstderr:\n{}",
920 res.exit_status,
921 res.stdout,
922 res.stderr
923 ));
924 }
925 if !res.stderr.is_empty() {
926 log::warn!("tbprofiler stderr: {}", res.stderr);
927 }
928 let job_id: usize = res.stdout.split(';').next().unwrap().trim().parse()?;
929 let tasks = array_end
930 .checked_add(1)
931 .ok_or_else(|| anyhow::anyhow!("Overflow when calculating tasks"))?;
932 let job_id = SlurmJobId {
933 array_id: job_id,
934 tasks,
935 running_tasks: tasks,
936 };
937 Ok(job_id)
938}
939
940async fn poll_running_tasks(
941 client: &Client,
942 array_id: usize,
943) -> Result<usize, anyhow::Error> {
944 let cmd = format!(
945 "squeue -j {} -r -h -o \"%T\" 2>/dev/null | grep -c RUNNING || true",
946 array_id
947 );
948 let res = client.execute(&cmd).await?;
949 let count: usize = res.stdout.trim().parse().unwrap_or(0);
950 Ok(count)
951}
952
953pub async fn delete_remote_files(
954 client: &Client,
955 paths: &[PathBuf],
956) -> Result<String, anyhow::Error> {
957 if paths.is_empty() {
958 return Ok(String::new());
959 }
960 let filenames: Vec<String> = paths
961 .iter()
962 .map(|p| {
963 p.file_name()
964 .and_then(|n| n.to_str())
965 .unwrap_or("<invalid>")
966 .to_string()
967 })
968 .collect();
969 let files = paths
970 .iter()
971 .map(|p| {
972 let remote = p.to_string_lossy().replace('\\', "/");
973 format!("'{}'", remote.replace('\'', "'\\''"))
974 })
975 .collect::<Vec<_>>()
976 .join(" ");
977 let command = format!("rm -rf {}", files);
978 let res = client.execute(&command).await?;
979 if res.exit_status != 0 {
980 return Err(anyhow::anyhow!(
981 "Failed to delete files (exit {}):\nstderr:\n{}",
982 res.exit_status,
983 res.stderr
984 ));
985 }
986 Ok(filenames.join("\n"))
987}
988
989pub async fn delete_tbprofiler_results(
990 client: &Client,
991 tb_config: TBConfig,
992) -> Result<String, anyhow::Error> {
993 let command = format!("rm -rf {}/results/*", tb_config.out_dir);
994 let res = client.execute(&command).await?;
995 if res.exit_status != 0 {
996 return Err(anyhow::anyhow!(
997 "Failed to delete remote file (exit {}):\nstderr:\n{}",
998 res.exit_status,
999 res.stderr
1000 ));
1001 }
1002 Ok(res.stdout)
1003}
1004
1005#[allow(dead_code)]
1006fn request_password(uri: String, event_tx: mpsc::UnboundedSender<Event>) -> ClientAuth {
1007 let auth = ClientAuth {
1008 message: String::new(),
1009 username_opt: Some(String::new()),
1010 domain_opt: Some(String::new()),
1011 password_opt: Some(String::new()),
1012 remember_opt: Some(false),
1013 anonymous_opt: Some(false),
1014 };
1015 let (auth_tx, mut auth_rx) = mpsc::channel(1);
1016 event_tx
1017 .send(Event::RemoteAuth(uri.clone(), auth, auth_tx))
1018 .unwrap();
1019
1020 if let Some(auth) = auth_rx.blocking_recv() {
1021 auth
1022 } else {
1023 ClientAuth {
1024 message: "Authentication cancelled".into(),
1025 username_opt: None,
1026 domain_opt: None,
1027 password_opt: None,
1028 remember_opt: None,
1029 anonymous_opt: None,
1030 }
1031 }
1032}
1033
1034#[derive(Debug, Clone)]
1035pub struct RemoteFile {
1036 pub host: String,
1037 pub port: u16,
1038 pub username: Option<String>,
1039 pub path: String,
1040}
1041
1042impl RemoteFile {
1043 pub fn uri(&self) -> String {
1044 let userpart = if let Some(user) = &self.username {
1045 format!("{}@", user)
1046 } else {
1047 "".into()
1048 };
1049 format!("ssh://{}{}:{}{}", userpart, self.host, self.port, self.path)
1050 }
1051}
1052
1053#[derive(Debug)]
1054pub enum RemoteFileError {
1055 InvalidUrl(String),
1056 UnsupportedScheme(String),
1057 MissingHost,
1058}
1059
1060impl fmt::Display for RemoteFileError {
1061 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1062 match self {
1063 RemoteFileError::InvalidUrl(e) => write!(f, "Invalid URL: {e}"),
1064 RemoteFileError::UnsupportedScheme(s) => {
1065 write!(f, "Unsupported scheme: {s}")
1066 }
1067 RemoteFileError::MissingHost => write!(f, "Missing host"),
1068 }
1069 }
1070}
1071
1072impl FromStr for RemoteFile {
1073 type Err = RemoteFileError;
1074
1075 fn from_str(uri: &str) -> Result<Self, Self::Err> {
1076 let url = Url::parse(uri).map_err(|e| RemoteFileError::InvalidUrl(e.to_string()))?;
1077 if url.scheme() != "ssh" {
1078 return Err(RemoteFileError::UnsupportedScheme(url.scheme().to_string()));
1079 }
1080 let host = url
1081 .host_str()
1082 .ok_or(RemoteFileError::MissingHost)?
1083 .to_string();
1084 let port = url.port().unwrap_or(22);
1085 let username = if url.username().is_empty() {
1086 None
1087 } else {
1088 Some(url.username().to_string())
1089 };
1090 let mut path = url.path().to_string();
1091 if path.is_empty() {
1092 path = "/".into();
1093 }
1094 if path.len() > 1 && path.ends_with('/') {
1095 path.pop();
1096 }
1097 Ok(Self {
1098 host,
1099 port,
1100 username,
1101 path,
1102 })
1103 }
1104}
1105
1106enum Cmd {
1107 Items(IconSizes, mpsc::Sender<ClientItems>),
1108 Rescan,
1109 Connect(ClientItem, tokio::sync::oneshot::Sender<anyhow::Result<()>>),
1110 RemoteDrive(String, tokio::sync::oneshot::Sender<anyhow::Result<()>>),
1111 RemoteScan(
1112 String,
1113 IconSizes,
1114 mpsc::Sender<Result<Vec<tab::Item>, String>>,
1115 ),
1116 RemoteParent(String, IconSizes, mpsc::Sender<Result<tab::Item, String>>),
1117 #[allow(clippy::type_complexity)]
1118 DirInfo(
1119 String,
1120 mpsc::Sender<Result<(String, String, Option<PathBuf>), anyhow::Error>>,
1121 ),
1122 Disconnect(ClientItem),
1123 Download(
1124 Box<[PathBuf]>,
1125 Vec<String>,
1126 PathBuf,
1127 Option<PathBuf>,
1128 tokio::sync::oneshot::Sender<anyhow::Result<()>>,
1129 tokio::sync::mpsc::UnboundedSender<()>,
1130 ),
1131 RunTbProfiler(
1132 Box<[PathBuf]>,
1133 Vec<String>,
1134 TBConfig,
1135 tokio::sync::oneshot::Sender<anyhow::Result<SlurmJobId>>,
1136 ),
1137 DeleteRemoteFiles(
1138 Box<[PathBuf]>,
1139 Vec<String>,
1140 tokio::sync::oneshot::Sender<anyhow::Result<String>>,
1141 ),
1142 DeleteTbProfilerResults(
1143 String,
1144 TBConfig,
1145 tokio::sync::oneshot::Sender<anyhow::Result<String>>,
1146 ),
1147 PollJobStatus(usize, String),
1148}
1149
1150enum Event {
1151 Changed,
1152 Items(ClientItems),
1153 ClientResult(ClientItem, Result<bool, String>),
1154 #[allow(dead_code)]
1155 RemoteAuth(String, ClientAuth, mpsc::Sender<ClientAuth>),
1156 RemoteResult(String, Result<bool, String>),
1157 RunTbProfilerResult(String, Result<SlurmJobId, String>),
1158 DeleteRemoteFilesResult(String, Result<String, String>),
1159 JobStatusUpdate(String, usize, usize),
1160}
1161
1162#[derive(Clone, Debug)]
1163pub struct Item {
1164 name: String,
1165 is_connected: bool,
1166 icon_opt: Option<PathBuf>,
1167 icon_symbolic_opt: Option<PathBuf>,
1168 path_opt: Option<PathBuf>,
1169 uri: String,
1170 host: String,
1171 port: u16,
1172 username: String,
1173 auth: AuthMethod,
1174 server_check: ServerCheckMethod,
1175}
1176
1177impl Item {
1178 pub fn name(&self) -> String {
1179 self.name.clone()
1180 }
1181
1182 pub const fn is_connected(&self) -> bool {
1183 self.is_connected
1184 }
1185
1186 pub fn uri(&self) -> String {
1187 self.uri.clone()
1188 }
1189
1190 pub fn host(&self) -> String {
1191 self.host.clone()
1192 }
1193
1194 pub fn icon(&self, symbolic: bool) -> Option<widget::icon::Handle> {
1195 if symbolic {
1196 self.icon_symbolic_opt.as_ref()
1197 } else {
1198 self.icon_opt.as_ref()
1199 }
1200 .map(|icon| widget::icon::from_path(icon.clone()))
1201 }
1202
1203 pub fn path(&self) -> Option<PathBuf> {
1204 self.path_opt.clone()
1205 }
1206}
1207
1208pub struct Russh {
1209 command_tx: mpsc::UnboundedSender<Cmd>,
1210 event_rx: Arc<Mutex<mpsc::UnboundedReceiver<Event>>>,
1211}
1212
1213impl Russh {
1214 pub fn new() -> Self {
1215 let (command_tx, mut command_rx) = mpsc::unbounded_channel();
1216 let (event_tx, event_rx) = mpsc::unbounded_channel();
1217 std::thread::spawn(move || {
1218 let rt = Builder::new_current_thread().enable_all().build().unwrap();
1219 rt.block_on(async move {
1220 let clients = Arc::new(RwLock::new(HashMap::<String, Arc<Client>>::new()));
1221 let mut client_items = Vec::<ClientItem>::new();
1222 while let Some(command) = command_rx.recv().await {
1223 match command {
1224 Cmd::Items(_sizes, items_tx) => {
1225 items_tx.send(client_items.clone()).await.unwrap();
1226 }
1227 Cmd::Rescan => {
1228 event_tx.send(Event::Items(client_items.clone())).unwrap();
1229 }
1230 Cmd::Connect(client_item, result_tx) => {
1231 let client_item_clone = client_item.clone();
1232 let ClientItem::Russh(item) = client_item else {
1233 _ = result_tx.send(Err(anyhow::anyhow!("No client item")));
1234 continue;
1235 };
1236 let event_tx = event_tx.clone();
1237
1238 let res = Client::connect(
1239 (item.host.as_str(), item.port),
1240 item.username.as_str(),
1241 item.auth.clone(),
1242 item.server_check.clone(),
1243 )
1244 .await;
1245 match res {
1246 Ok(client) => {
1247 {
1248 let mut write = clients.write().await;
1249 write.insert(item.host.clone(), Arc::new(client));
1250 }
1251 client_items.push(ClientItem::Russh(Item {
1252 name: item.name.clone(),
1253 is_connected: true,
1254 icon_opt: item.icon_opt.clone(),
1255 icon_symbolic_opt: item.icon_symbolic_opt.clone(),
1256 path_opt: item.path_opt.clone(),
1257 uri: item.uri.clone(),
1258 host: item.host.clone(),
1259 port: item.port,
1260 username: item.username.clone(),
1261 auth: item.auth.clone(),
1262 server_check: item.server_check.clone(),
1263 }));
1264 _ = result_tx.send(Ok(()));
1265 event_tx
1266 .send(Event::ClientResult(
1267 client_item_clone.clone(),
1268 Ok(true),
1269 ))
1270 .unwrap();
1271 }
1272 Err(err) => {
1273 _ = result_tx.send(Err(anyhow::anyhow!("{err:?}")));
1274 event_tx
1275 .send(Event::ClientResult(
1276 client_item_clone,
1277 Err(format!("{err}")),
1278 ))
1279 .unwrap();
1280 }
1281 }
1282 }
1283 Cmd::RemoteDrive(uri, result_tx) => {
1284 let mut result_tx_opt = Some(result_tx);
1285 let event_tx = event_tx.clone();
1286 let remote_file =
1287 match uri.parse::<RemoteFile>().map_err(|e| e.to_string()) {
1288 Ok(rf) => rf,
1289 Err(err) => {
1290 if let Some(result_tx) = result_tx_opt.take() {
1291 _ = result_tx.send(Err(anyhow::anyhow!("{err:?}")));
1292 }
1293 let _ = event_tx
1294 .send(Event::RemoteResult(uri, Err(err.to_string())));
1295 continue;
1296 }
1297 };
1298 let norm_uri = remote_file.uri();
1299 let host = remote_file.host.as_str();
1300 let port = remote_file.port;
1301 let username = match remote_file.username {
1302 Some(u) => u,
1303 None => {
1304 let msg = "No username specified in URI";
1305 if let Some(result_tx) = result_tx_opt.take() {
1306 let _ = result_tx.send(Err(anyhow::anyhow!(msg)));
1307 }
1308 let _ = event_tx.send(Event::RemoteResult(
1309 norm_uri.clone(),
1310 Err(msg.to_string()),
1311 ));
1312 continue;
1313 }
1314 };
1315 let auth = match get_key_files() {
1316 Ok((key_path, _)) => AuthMethod::with_key_file(key_path, None),
1317 Err(err) => {
1318 if let Some(result_tx) = result_tx_opt.take() {
1319 let _ =
1320 result_tx.send(Err(anyhow::anyhow!(err.to_string())));
1321 }
1322 let _ = event_tx.send(Event::RemoteResult(
1323 norm_uri.clone(),
1324 Err(err.to_string()),
1325 ));
1326 continue;
1327 }
1328 };
1329 let existing_client = {
1330 let read = clients.read().await;
1331 read.contains_key(host)
1332 };
1333 let client_item = ClientItem::Russh(Item {
1334 name: host.to_string(),
1335 is_connected: true,
1336 icon_opt: None,
1337 icon_symbolic_opt: None,
1338 path_opt: Some(PathBuf::from(remote_file.path.clone())),
1339 uri: norm_uri.clone(),
1340 host: host.to_string(),
1341 port,
1342 username: username.clone(),
1343 auth: auth.clone(),
1344 server_check: ServerCheckMethod::NoCheck,
1345 });
1346 if existing_client {
1347 let has_item = client_items.iter().any(|item| {
1348 matches!(
1349 item,
1350 ClientItem::Russh(r) if r.host == host
1351 )
1352 });
1353 if !has_item {
1354 client_items.push(client_item);
1355 let _ = event_tx.send(Event::Changed);
1356 }
1357 if let Some(result_tx) = result_tx_opt.take() {
1358 let _ = result_tx.send(Ok(()));
1359 }
1360 let _ =
1361 event_tx.send(Event::RemoteResult(norm_uri.clone(), Ok(true)));
1362 continue;
1363 }
1364 match Client::connect(
1365 (host, port),
1366 username.as_str(),
1367 auth,
1368 ServerCheckMethod::NoCheck,
1369 )
1370 .await
1371 {
1372 Ok(client) => {
1373 {
1374 let mut write = clients.write().await;
1375 write.insert(host.to_string(), Arc::new(client));
1376 }
1377 client_items.push(client_item);
1378 let _ = event_tx.send(Event::Changed);
1379 if let Some(result_tx) = result_tx_opt.take() {
1380 let _ = result_tx.send(Ok(()));
1381 }
1382 let _ = event_tx
1383 .send(Event::RemoteResult(norm_uri.clone(), Ok(true)));
1384 }
1385 Err(err) => {
1386 let msg = format!("Connecting fresh session failed: {}", err);
1387 if let Some(result_tx) = result_tx_opt.take() {
1388 let _ = result_tx.send(Err(anyhow::anyhow!(msg.clone())));
1389 }
1390 let _ = event_tx.send(Event::RemoteResult(
1391 norm_uri.clone(),
1392 Err(msg.clone()),
1393 ));
1394 }
1395 }
1396 }
1397 Cmd::RemoteScan(uri, sizes, items_tx) => {
1398 let remote_file =
1399 match uri.parse::<RemoteFile>().map_err(|e| e.to_string()) {
1400 Ok(rf) => rf,
1401 Err(e) => {
1402 let _ = items_tx.send(Err(e)).await;
1403 continue;
1404 }
1405 };
1406 let norm_uri = remote_file.uri();
1407 let host = remote_file.host.as_str();
1408 let port = remote_file.port;
1409 let username = match remote_file.username {
1410 Some(u) => u,
1411 None => {
1412 event_tx
1413 .send(Event::RemoteResult(
1414 norm_uri,
1415 Err("No username specified in URI".into()),
1416 ))
1417 .unwrap();
1418 continue;
1419 }
1420 };
1421
1422 let existing_client = {
1423 let read = clients.read().await;
1424 read.get(host).cloned()
1425 };
1426 if let Some(client) = existing_client {
1427 let result =
1428 remote_sftp_list(&client, &norm_uri, sizes)
1429 .await;
1430 let _ = items_tx.send(result).await;
1431 } else {
1432 let key_path = match get_key_files() {
1433 Ok(key_pair) => key_pair.0,
1434 Err(e) => {
1435 event_tx
1436 .send(Event::RemoteResult(norm_uri, Err(e)))
1437 .unwrap();
1438 continue;
1439 }
1440 };
1441 let auth = AuthMethod::with_key_file(key_path, None);
1442 match Client::connect(
1443 (host, port),
1444 username.as_str(),
1445 auth.clone(),
1446 ServerCheckMethod::NoCheck,
1447 )
1448 .await
1449 {
1450 Ok(client) => {
1451 {
1452 let mut write = clients.write().await;
1453 write.insert(host.to_string(), Arc::new(client));
1454 }
1455 let client = {
1456 let read = clients.read().await;
1457 Arc::clone(read.get(host).unwrap())
1458 };
1459 let result = remote_sftp_list(
1460 &client,
1461 &norm_uri,
1462 sizes,
1463 )
1464 .await;
1465 let _ = items_tx.send(result).await;
1466 event_tx
1467 .send(Event::RemoteResult(norm_uri, Ok(true)))
1468 .unwrap();
1469 }
1470 Err(err) => {
1471 let msg =
1472 format!("Connecting fresh session failed: {}", err);
1473 event_tx
1474 .send(Event::RemoteResult(norm_uri, Err(msg)))
1475 .unwrap();
1476 }
1477 }
1478 }
1479 }
1480 Cmd::RemoteParent(uri, sizes, items_tx) => {
1481 let remote_file =
1482 match uri.parse::<RemoteFile>().map_err(|e| e.to_string()) {
1483 Ok(rf) => rf,
1484 Err(e) => {
1485 let _ = items_tx.send(Err(e)).await;
1486 continue;
1487 }
1488 };
1489 let norm_uri = remote_file.uri();
1490 let host = remote_file.host.as_str();
1491 let port = remote_file.port;
1492 let username = match remote_file.username {
1493 Some(u) => u,
1494 None => {
1495 event_tx
1496 .send(Event::RemoteResult(
1497 norm_uri,
1498 Err("No username specified in URI".into()),
1499 ))
1500 .unwrap();
1501 continue;
1502 }
1503 };
1504 let existing_client = {
1505 let read = clients.read().await;
1506 read.get(host).cloned()
1507 };
1508 if let Some(client) = existing_client {
1509 let result = remote_sftp_parent(&client, &norm_uri, sizes).await;
1510 let _ = items_tx.send(result).await;
1511 } else {
1512 let key_path = match get_key_files() {
1513 Ok(key_pair) => key_pair.0,
1514 Err(e) => {
1515 event_tx
1516 .send(Event::RemoteResult(norm_uri, Err(e)))
1517 .unwrap();
1518 continue;
1519 }
1520 };
1521 let auth = AuthMethod::with_key_file(key_path, None);
1522 match Client::connect(
1523 (host, port),
1524 username.as_str(),
1525 auth.clone(),
1526 ServerCheckMethod::NoCheck,
1527 )
1528 .await
1529 {
1530 Ok(client) => {
1531 {
1532 let mut write = clients.write().await;
1533 write.insert(host.to_string(), Arc::new(client));
1534 }
1535 let client = {
1536 let read = clients.read().await;
1537 Arc::clone(read.get(host).unwrap())
1538 };
1539 let result =
1540 remote_sftp_parent(&client, &norm_uri, sizes).await;
1541 let _ = items_tx.send(result).await;
1542 event_tx
1543 .send(Event::RemoteResult(norm_uri, Ok(true)))
1544 .unwrap();
1545 }
1546 Err(err) => {
1547 let msg =
1548 format!("Connecting fresh session failed: {}", err);
1549 event_tx
1550 .send(Event::RemoteResult(norm_uri, Err(msg)))
1551 .unwrap();
1552 }
1553 }
1554 }
1555 }
1556 Cmd::DirInfo(uri, result_tx) => {
1557 let remote_file =
1558 match uri.parse::<RemoteFile>().map_err(|e| e.to_string()) {
1559 Ok(rf) => rf,
1560 Err(e) => {
1561 let _ = result_tx.send(Err(anyhow::anyhow!(e))).await;
1562 continue;
1563 }
1564 };
1565 let host = remote_file.host.as_str();
1566 let client = {
1567 let read = clients.read().await;
1568 match read.get(host) {
1569 Some(c) => Arc::clone(c),
1570 None => {
1571 let msg =
1572 format!("No SSH client connected for host: {}", host);
1573 let _ = result_tx.send(Err(anyhow::anyhow!(msg))).await;
1574 continue;
1575 }
1576 }
1577 };
1578 let result = dir_info(&client, &uri)
1579 .await
1580 .map_err(|e| anyhow::anyhow!(e));
1581 result_tx.send(result).await.unwrap();
1582 }
1583 Cmd::Disconnect(client_item) => {
1584 let ClientItem::Russh(item) = client_item else {
1585 continue;
1586 };
1587 {
1588 let mut write = clients.write().await;
1589 write.remove(&item.host);
1590 }
1591 client_items.retain(|ci| {
1592 if let ClientItem::Russh(r) = ci {
1593 r.host != item.host
1594 } else {
1595 true
1596 }
1597 });
1598 let event_tx = event_tx.clone();
1599 let _ = event_tx.send(Event::Changed);
1600 }
1601 Cmd::Download(paths, uris, path, zip_output, result_tx, progress_tx) => {
1602 let result: Result<(), anyhow::Error> = async {
1603 let remote_files: Vec<_> = uris
1604 .iter()
1605 .map(|u| {
1606 u.parse::<RemoteFile>().map_err(|e| anyhow::anyhow!(e))
1607 })
1608 .collect::<Result<_, anyhow::Error>>()?;
1609 let host = remote_files
1610 .first()
1611 .ok_or_else(|| anyhow::anyhow!("No URIs provided"))?
1612 .host
1613 .clone();
1614 if remote_files.iter().any(|rf| rf.host != host) {
1615 return Err(anyhow::anyhow!(
1616 "All download URIs must be from the same host"
1617 ));
1618 }
1619 let client = {
1620 let read = clients.read().await;
1621 read.get(&host).cloned()
1622 }
1623 .ok_or_else(|| anyhow::anyhow!("No client for host {host}"))?;
1624 perform_download(&client, paths.clone(), path.clone(), zip_output.clone(), &progress_tx).await?;
1625 Ok(())
1626 }
1627 .await;
1628 let _ = result_tx.send(result);
1629 }
1630 Cmd::RunTbProfiler(paths, uris, tb_config, result_tx) => {
1631 let uri = uris
1632 .first()
1633 .cloned()
1634 .unwrap_or_else(|| "ssh:///".to_string());
1635 if uris.is_empty() {
1636 let err = anyhow::anyhow!("No URIs provided");
1637 let _ = result_tx.send(Err(err));
1638 continue;
1639 }
1640 let remote_files: Vec<_> = match uris
1641 .iter()
1642 .map(|u| u.parse::<RemoteFile>().map_err(|e| anyhow::anyhow!(e)))
1643 .collect::<Result<_, _>>()
1644 {
1645 Ok(v) => v,
1646 Err(err) => {
1647 let _ = result_tx.send(Err(err));
1648 continue;
1649 }
1650 };
1651 let host = match remote_files.first() {
1652 Some(rf) => rf.host.clone(),
1653 None => {
1654 let _ =
1655 result_tx.send(Err(anyhow::anyhow!("No URIs provided")));
1656 continue;
1657 }
1658 };
1659 if remote_files.iter().any(|rf| rf.host != host) {
1660 let _ = result_tx.send(Err(anyhow::anyhow!(
1661 "All URIs must be from the same host"
1662 )));
1663 continue;
1664 }
1665 let client = {
1666 let read = clients.read().await;
1667 read.get(&host).cloned()
1668 };
1669 let client = match client {
1670 Some(c) => c,
1671 None => {
1672 let err = anyhow::anyhow!("No client for host {}", host);
1673 let _ = result_tx.send(Err(err));
1674 continue;
1675 }
1676 };
1677 let result = run_tbprofiler(&client, paths, tb_config).await;
1678 let event_result: Result<SlurmJobId, String> = result
1679 .as_ref()
1680 .copied()
1681 .map_err(|e| e.to_string());
1682 let _ = result_tx.send(result);
1683 event_tx
1684 .send(Event::RunTbProfilerResult(uri, event_result))
1685 .unwrap();
1686 }
1687 Cmd::DeleteRemoteFiles(paths, uris, result_tx) => {
1688 let uri = uris
1689 .first()
1690 .cloned()
1691 .unwrap_or_else(|| "ssh:///".to_string());
1692 if uris.is_empty() {
1693 let err = anyhow::anyhow!("No URIs provided");
1694 let _ = result_tx.send(Err(err));
1695 continue;
1696 }
1697 let remote_files: Vec<_> = match uris
1698 .iter()
1699 .map(|u| u.parse::<RemoteFile>().map_err(|e| anyhow::anyhow!(e)))
1700 .collect::<Result<_, _>>()
1701 {
1702 Ok(v) => v,
1703 Err(err) => {
1704 let _ = result_tx.send(Err(err));
1705 continue;
1706 }
1707 };
1708 let host = match remote_files.first() {
1709 Some(rf) => rf.host.clone(),
1710 None => {
1711 let _ =
1712 result_tx.send(Err(anyhow::anyhow!("No URIs provided")));
1713 continue;
1714 }
1715 };
1716 if remote_files.iter().any(|rf| rf.host != host) {
1717 let _ = result_tx.send(Err(anyhow::anyhow!(
1718 "All URIs must be from the same host"
1719 )));
1720 continue;
1721 }
1722 let client = {
1723 let read = clients.read().await;
1724 read.get(&host).cloned()
1725 };
1726 let client = match client {
1727 Some(c) => c,
1728 None => {
1729 let err = anyhow::anyhow!("No client for host {}", host);
1730 let _ = result_tx.send(Err(err));
1731 continue;
1732 }
1733 };
1734 let result = delete_remote_files(&client, &paths).await;
1735 let event_result: Result<String, String> = result
1736 .as_ref()
1737 .map(|s| s.clone())
1738 .map_err(|e| e.to_string());
1739 let _ = result_tx.send(result);
1740 event_tx
1741 .send(Event::DeleteRemoteFilesResult(uri, event_result))
1742 .unwrap();
1743 }
1744 Cmd::DeleteTbProfilerResults(uri, tb_config, result_tx) => {
1745 let remote_file =
1746 match uri.parse::<RemoteFile>().map_err(|e| e.to_string()) {
1747 Ok(rf) => rf,
1748 Err(e) => {
1749 let _ = result_tx.send(Err(anyhow::anyhow!(e)));
1750 continue;
1751 }
1752 };
1753 let host = remote_file.host.as_str();
1754 let client = {
1755 let read = clients.read().await;
1756 match read.get(host) {
1757 Some(c) => Arc::clone(c),
1758 None => {
1759 let msg =
1760 format!("No SSH client connected for host: {}", host);
1761 let _ = result_tx.send(Err(anyhow::anyhow!(msg)));
1762 continue;
1763 }
1764 }
1765 };
1766 let result = delete_tbprofiler_results(&client, tb_config).await;
1767 let _ = result_tx.send(result);
1768 }
1769 Cmd::PollJobStatus(array_id, uri) => {
1770 let remote_file = match uri.parse::<RemoteFile>() {
1771 Ok(rf) => rf,
1772 Err(e) => {
1773 log::warn!("PollJobStatus: invalid URI {uri}: {e}");
1774 continue;
1775 }
1776 };
1777 let client = {
1778 let read = clients.read().await;
1779 read.get(remote_file.host.as_str()).cloned()
1780 };
1781 let client = match client {
1782 Some(c) => c,
1783 None => {
1784 log::warn!("PollJobStatus: no client for host {}", remote_file.host);
1785 continue;
1786 }
1787 };
1788 let event_tx = event_tx.clone();
1789 tokio::spawn(async move {
1790 loop {
1791 tokio::time::sleep(std::time::Duration::from_secs(30)).await;
1792 let running = match poll_running_tasks(&client, array_id).await {
1793 Ok(n) => n,
1794 Err(e) => {
1795 log::warn!("Failed to poll job {array_id}: {e}");
1796 break;
1797 }
1798 };
1799 if event_tx
1800 .send(Event::JobStatusUpdate(uri.clone(), array_id, running))
1801 .is_err()
1802 {
1803 log::warn!("PollJobStatus: event_tx send failed for job_id={array_id}, stopping poll loop");
1804 break;
1805 }
1806 if running == 0 {
1807 break;
1808 }
1809 }
1810 });
1811 }
1812 }
1813 }
1814 });
1815 });
1816 Self {
1817 command_tx,
1818 event_rx: Arc::new(Mutex::new(event_rx)),
1819 }
1820 }
1821}
1822
1823impl Connector for Russh {
1824 fn items(&self, sizes: IconSizes) -> Option<ClientItems> {
1825 let (items_tx, mut items_rx) = mpsc::channel(1);
1826 self.command_tx.send(Cmd::Items(sizes, items_tx)).unwrap();
1827 items_rx.blocking_recv()
1828 }
1829
1830 fn connect(&self, item: ClientItem) -> Task<()> {
1831 let command_tx = self.command_tx.clone();
1832 Task::perform(
1833 async move {
1834 let (res_tx, res_rx) = tokio::sync::oneshot::channel();
1835
1836 command_tx.send(Cmd::Connect(item, res_tx)).unwrap();
1837 res_rx.await
1838 },
1839 |x| {
1840 if let Err(err) = x {
1841 log::error!("{err:?}");
1842 }
1843 },
1844 )
1845 }
1846
1847 fn remote_drive(&self, uri: String) -> Task<()> {
1848 let command_tx = self.command_tx.clone();
1849 Task::perform(
1850 async move {
1851 let (res_tx, res_rx) = tokio::sync::oneshot::channel();
1852
1853 command_tx.send(Cmd::RemoteDrive(uri, res_tx)).unwrap();
1854 res_rx.await
1855 },
1856 |x| {
1857 if let Err(err) = x {
1858 log::error!("{err:?}");
1859 }
1860 },
1861 )
1862 }
1863
1864 fn download_file(&self, paths: Box<[PathBuf]>, uris: Vec<String>, to: PathBuf, zip_output: Option<PathBuf>) -> cosmic::Task<super::DownloadEvent> {
1865 let command_tx = self.command_tx.clone();
1866 cosmic::Task::stream(cosmic::iced::stream::channel(
1867 16,
1868 move |mut event_tx: cosmic::iced::futures::channel::mpsc::Sender<super::DownloadEvent>| async move {
1869 use cosmic::iced::futures::SinkExt;
1870 let (res_tx, mut res_rx) = tokio::sync::oneshot::channel();
1871 let (progress_tx, mut progress_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1872
1873 command_tx
1874 .send(Cmd::Download(paths, uris, to, zip_output, res_tx, progress_tx))
1875 .unwrap();
1876
1877 let result = loop {
1878 tokio::select! {
1879 Some(()) = progress_rx.recv() => {
1880 let _ = event_tx.send(super::DownloadEvent::FileCompleted).await;
1881 }
1882 result = &mut res_rx => {
1883 while let Ok(()) = progress_rx.try_recv() {
1884 let _ = event_tx.send(super::DownloadEvent::FileCompleted).await;
1885 }
1886 break result;
1887 }
1888 }
1889 };
1890
1891 let result = result
1892 .unwrap_or_else(|_| Err(anyhow::anyhow!("channel closed")))
1893 .map_err(|e| e.to_string());
1894 let _ = event_tx.send(super::DownloadEvent::Complete(result)).await;
1895 },
1896 ))
1897 }
1898
1899 fn remote_scan(
1900 &self,
1901 uri: &str,
1902 sizes: IconSizes,
1903 ) -> Option<Result<Vec<tab::Item>, String>> {
1904 let (items_tx, mut items_rx) = mpsc::channel(1);
1905
1906 if let Err(e) = self.command_tx.send(Cmd::RemoteScan(
1907 uri.to_string(),
1908 sizes,
1909 items_tx,
1910 )) {
1911 log::error!(
1912 "remote_scan: failed to send Cmd::RemoteScan for uri {}: {}",
1913 uri,
1914 e
1915 );
1916 return Some(Err("command channel closed".into()));
1917 }
1918
1919 items_rx.blocking_recv()
1920 }
1921
1922 fn remote_parent_item(&self, uri: &str, sizes: IconSizes) -> Option<Result<tab::Item, String>> {
1923 let (items_tx, mut items_rx) = mpsc::channel(1);
1924
1925 if let Err(e) = self
1926 .command_tx
1927 .send(Cmd::RemoteParent(uri.to_string(), sizes, items_tx))
1928 {
1929 log::error!(
1930 "remote_parent: failed to send Cmd::RemoteParent for uri {}: {}",
1931 uri,
1932 e
1933 );
1934 return Some(Err("command channel closed".into()));
1935 }
1936
1937 items_rx.blocking_recv()
1938 }
1939
1940 fn dir_info(&self, uri: &str) -> Option<(String, String, Option<PathBuf>)> {
1941 let (result_tx, mut result_rx) = mpsc::channel(1);
1942 self.command_tx
1943 .send(Cmd::DirInfo(uri.to_string(), result_tx))
1944 .unwrap();
1945 result_rx.blocking_recv().and_then(|res| res.ok())
1946 }
1947
1948 fn disconnect(&self, item: ClientItem) -> Task<()> {
1949 let command_tx = self.command_tx.clone();
1950 Task::future(async move {
1951 command_tx.send(Cmd::Disconnect(item)).unwrap();
1952 })
1953 }
1954
1955 fn run_tb_profiler(
1956 &self,
1957 paths: Box<[PathBuf]>,
1958 uris: Vec<String>,
1959 tb_config: TBConfig,
1960 ) -> Task<()> {
1961 let command_tx = self.command_tx.clone();
1962 Task::perform(
1963 async move {
1964 let (res_tx, res_rx) = tokio::sync::oneshot::channel();
1965
1966 command_tx
1967 .send(Cmd::RunTbProfiler(paths, uris, tb_config, res_tx))
1968 .unwrap();
1969
1970 res_rx.await
1971 },
1972 |x| match x {
1973 Ok(Ok(job)) => log::info!("TBProfiler started: job_id={}, tasks={}", job.array_id, job.tasks),
1974 Ok(Err(err)) => log::error!("TBProfiler failed: {err}"),
1975 Err(err) => log::error!("Channel error: {err}"),
1976 },
1977 )
1978 }
1979
1980 fn delete_remote_files(&self, paths: Box<[PathBuf]>, uris: Vec<String>) -> Task<()> {
1981 let command_tx = self.command_tx.clone();
1982 Task::perform(
1983 async move {
1984 let (res_tx, res_rx) = tokio::sync::oneshot::channel();
1985
1986 command_tx
1987 .send(Cmd::DeleteRemoteFiles(paths, uris, res_tx))
1988 .unwrap();
1989
1990 res_rx.await
1991 },
1992 |x| match x {
1993 Ok(Ok(msg)) => log::info!("Remote files deleted: {msg}"),
1994 Ok(Err(err)) => log::error!("Remote file deletion failed: {err}"),
1995 Err(err) => log::error!("Channel error: {err}"),
1996 },
1997 )
1998 }
1999
2000 fn poll_job_status(&self, job_id: usize, uri: String) -> Task<()> {
2001 let command_tx = self.command_tx.clone();
2002 Task::future(async move {
2003 command_tx.send(Cmd::PollJobStatus(job_id, uri)).unwrap();
2004 })
2005 }
2006
2007 fn delete_tb_profiler_results(&self, uri: String, tb_config: TBConfig) -> Task<()> {
2008 let command_tx = self.command_tx.clone();
2009 Task::perform(
2010 async move {
2011 let (res_tx, res_rx) = tokio::sync::oneshot::channel();
2012
2013 command_tx
2014 .send(Cmd::DeleteTbProfilerResults(uri, tb_config, res_tx))
2015 .unwrap();
2016
2017 res_rx.await
2018 },
2019 |x| match x {
2020 Ok(Ok(msg)) => log::info!("TBProfiler results deleted: {msg}"),
2021 Ok(Err(err)) => log::error!("TBProfiler deletion failed: {err}"),
2022 Err(err) => log::error!("Channel error: {err}"),
2023 },
2024 )
2025 }
2026
2027 fn subscription(&self) -> Subscription<ClientMessage> {
2028 let command_tx = self.command_tx.clone();
2029 let event_rx = self.event_rx.clone();
2030 struct Wrapper {
2031 command_tx: mpsc::UnboundedSender<Cmd>,
2032 event_rx: Arc<Mutex<mpsc::UnboundedReceiver<Event>>>,
2033 }
2034 impl Hash for Wrapper {
2035 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
2036 TypeId::of::<Self>().hash(state);
2037 }
2038 }
2039 Subscription::run_with(
2040 Wrapper {
2041 command_tx,
2042 event_rx,
2043 },
2044 |Wrapper {
2045 command_tx,
2046 event_rx,
2047 }| {
2048 let command_tx = command_tx.clone();
2049 let event_rx = event_rx.clone();
2050 stream::channel(
2051 1,
2052 move |mut output: cosmic::iced::futures::channel::mpsc::Sender<
2053 ClientMessage,
2054 >| async move {
2055 command_tx.send(Cmd::Rescan).unwrap();
2056 while let Some(event) = event_rx.lock().await.recv().await {
2057 match event {
2058 Event::Changed => command_tx.send(Cmd::Rescan).unwrap(),
2059 Event::Items(items) => {
2060 output.send(ClientMessage::Items(items)).await.unwrap()
2061 }
2062 Event::ClientResult(item, res) => output
2063 .send(ClientMessage::ClientResult(item, res))
2064 .await
2065 .unwrap(),
2066 Event::RemoteAuth(uri, auth, auth_tx) => output
2067 .send(ClientMessage::RemoteAuth(uri, auth, auth_tx))
2068 .await
2069 .unwrap(),
2070 Event::RemoteResult(uri, res) => output
2071 .send(ClientMessage::RemoteResult(uri, res))
2072 .await
2073 .unwrap(),
2074 Event::RunTbProfilerResult(uri, res) => output
2075 .send(ClientMessage::RunTbProfilerResult(uri, res))
2076 .await
2077 .unwrap(),
2078 Event::DeleteRemoteFilesResult(uri, res) => output
2079 .send(ClientMessage::DeleteRemoteFilesResult(uri, res))
2080 .await
2081 .unwrap(),
2082 Event::JobStatusUpdate(uri, array_id, running_tasks) => output
2083 .send(ClientMessage::JobStatusUpdate(uri, array_id, running_tasks))
2084 .await
2085 .unwrap(),
2086 }
2087 }
2088 pending().await
2089 },
2090 )
2091 },
2092 )
2093 }
2094}