1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::{LazyLock, RwLock};
4use std::time::{Duration, SystemTime, UNIX_EPOCH};
5use walkdir::WalkDir;
6use serde::{Deserialize, Serialize};
7
8use super::{
9 DESC_MASSILIENSE, MIN_SEQ_ID_IDENTITY, SeqIdHit, SusceptibilityCalls,
10 erm41::{Erm41SusceptibilityCalls, identify_sequence_erm41, is_susceptible_erm41},
11 hsp65::identify_sequence_hsp65,
12 parse_ab1_quality, parse_ab1_sequence,
13 pnca::{PncaSusceptibilityCalls, identify_sequence_pnca, is_susceptible_pnca},
14 rpob::identify_sequence_rpob,
15 rrl::{RrlSusceptibilityCalls, identify_sequence_rrl_ntm, is_susceptible_rrl, is_susceptible_rrl_by_snp_calls_rare},
16 rrs::{RrsSusceptibilityCalls, identify_sequence_16s, is_susceptible_rrs, is_susceptible_rrs_by_snp_calls_rare},
17 trim_to_min_quality,
18};
19
20pub(crate) static AB1_SEQ_CACHE: LazyLock<RwLock<HashMap<PathBuf, Vec<SeqIdHit>>>> =
23 LazyLock::new(|| RwLock::new(HashMap::default()));
24
25#[derive(Clone, Debug, Serialize, Deserialize)]
27pub struct SampleSusceptibilityRecord {
28 pub sample_id: String,
29 pub gene: Option<String>,
30 pub file_name: String,
31 pub file_path: PathBuf,
32 #[serde(with = "super::serde_helpers::option_systemtime_secs")]
33 pub file_created: Option<SystemTime>,
34 pub susceptibility_calls: SusceptibilityCalls,
35 pub species: Option<String>,
36 pub identity: Option<f32>,
37 pub is_susceptible: Option<bool>,
38 #[serde(default)]
41 pub seq_id_hits: Vec<SeqIdHit>,
42}
43
44fn find_sample_id(s: &str) -> Option<&str> {
47 let bytes = s.as_bytes();
48 let mut i = 0;
49 while i + 10 <= bytes.len() {
50 if bytes[i..i + 2] == *b"20" && bytes[i..i + 10].iter().all(|b| b.is_ascii_digit()) {
51 let before_ok = i == 0 || !bytes[i - 1].is_ascii_digit();
52 let after_ok = i + 10 == bytes.len() || !bytes[i + 10].is_ascii_digit();
53 if before_ok && after_ok {
54 return Some(&s[i..i + 10]);
55 }
56 }
57 i += 1;
58 }
59 None
60}
61
62pub fn parse_ab1_filename(name: &str) -> (String, Option<String>) {
68 let stem = std::path::Path::new(name)
69 .file_stem()
70 .and_then(|s| s.to_str())
71 .unwrap_or(name);
72
73 let sample_id = find_sample_id(stem).unwrap_or(stem).to_string();
74
75 let lower_name = name.to_ascii_lowercase();
76 let gene = if lower_name.contains("erm41") || lower_name.contains("erm") {
77 Some("erm(41)".to_string())
78 } else if lower_name.contains("hsp65") || lower_name.contains("65kda") {
79 Some("hsp65".to_string())
80 } else if lower_name.contains("rpob") || lower_name.contains("rpo") {
81 Some("rpoB".to_string())
82 } else if lower_name.contains("mbak14") {
83 Some("16S".to_string())
84 } else if lower_name.contains("rrl") || lower_name.contains("mclr") {
85 Some("rrl".to_string())
86 } else if lower_name.contains("pnca") {
87 Some("pncA".to_string())
88 } else {
89 None
90 };
91
92 (sample_id, gene)
93}
94
95pub fn scan_ab1_directory(
103 scan_path: PathBuf,
104 cache_path: Option<PathBuf>,
105 max_age_days: u32,
106) -> Vec<SampleSusceptibilityRecord> {
107 log::debug!("ab1_scan: starting scan of {} (max_age_days={}, cache_path={:?})", scan_path.display(), max_age_days, cache_path);
108
109 type DiskCache = HashMap<String, (u64, SampleSusceptibilityRecord)>;
111 let mut disk_cache: DiskCache = cache_path
112 .as_ref()
113 .and_then(|p| std::fs::read_to_string(p).ok())
114 .and_then(|s| serde_json::from_str(&s).ok())
115 .unwrap_or_default();
116 log::debug!("ab1_scan: loaded {} disk cache entries", disk_cache.len());
117
118 let now = SystemTime::now();
119 let max_age = (max_age_days > 0)
120 .then(|| Duration::from_secs(u64::from(max_age_days) * 86_400));
121
122 let mut records = Vec::new();
123 let mut cache_dirty = false;
124
125 for entry in WalkDir::new(&scan_path).into_iter().filter_map(Result::ok) {
126 let path = entry.path();
127 if !path.is_file() {
128 continue;
129 }
130 if !path
131 .extension()
132 .map(|e| e.eq_ignore_ascii_case("ab1"))
133 .unwrap_or(false)
134 {
135 continue;
136 }
137
138 let meta = std::fs::metadata(path).ok();
139 let file_created: Option<SystemTime> = meta.as_ref().and_then(|m| m.created().ok());
140
141 if let (Some(max_age), Some(created)) = (&max_age, file_created) {
143 if now.duration_since(created).is_ok_and(|age| age > *max_age) {
144 log::debug!("ab1_scan: skipping old file: {}", path.display());
145 continue;
146 }
147 }
148
149 let mtime_secs: u64 = meta
150 .as_ref()
151 .and_then(|m| m.modified().ok())
152 .and_then(|t| t.duration_since(UNIX_EPOCH).ok())
153 .map(|d| d.as_secs())
154 .unwrap_or(0);
155
156 let path_key = path.to_string_lossy().into_owned();
157
158 let canonical_path = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
160
161 if let Some((cached_mtime, cached_record)) = disk_cache.get(&path_key) {
163 if *cached_mtime == mtime_secs && !cached_record.seq_id_hits.is_empty() {
164 log::debug!("ab1_scan: disk cache hit for {} ({} hits, canonical={})", path.display(), cached_record.seq_id_hits.len(), canonical_path.display());
165 if let Ok(mut guard) = AB1_SEQ_CACHE.write() {
166 guard.insert(canonical_path, cached_record.seq_id_hits.clone());
167 }
168 records.push(cached_record.clone());
169 continue;
170 } else {
171 log::debug!("ab1_scan: disk cache stale/no-hits for {} (mtime_match={}, hits={})", path.display(), *cached_mtime == mtime_secs, cached_record.seq_id_hits.len());
172 }
173 } else {
174 log::debug!("ab1_scan: no disk cache entry for {}", path.display());
175 }
176
177 let file_name = path
179 .file_name()
180 .and_then(|n| n.to_str())
181 .unwrap_or("")
182 .to_string();
183 let lower_name = file_name.to_ascii_lowercase();
184 let (sample_id, gene) = parse_ab1_filename(&file_name);
185
186 let bytes = match std::fs::read(path) {
187 Ok(b) => b,
188 Err(e) => {
189 log::warn!("ab1 batch scan: failed to read {}: {e}", path.display());
190 continue;
191 }
192 };
193
194 let is_erm41 = lower_name.contains("erm41") || lower_name.contains("erm");
195 let is_hsp65 = lower_name.contains("hsp65") || lower_name.contains("65kda");
196 let is_rpob = lower_name.contains("rpob") || lower_name.contains("rpo");
197 let is_16s = lower_name.contains("mbak14");
198 let is_23s_ntm = lower_name.contains("rrl") || lower_name.contains("mclr");
199 let is_pnca = lower_name.contains("pnca");
200
201 let ab1_seq = parse_ab1_sequence(&bytes);
202 let ab1_qual = parse_ab1_quality(&bytes);
203
204 let seq_id_hits = if let Some(seq) = ab1_seq.as_ref() {
205 let trimmed: &[u8] = match &ab1_qual {
206 Some(qual) => trim_to_min_quality(seq, qual, 20).unwrap_or(seq.as_slice()),
207 None => seq.as_slice(),
208 };
209 if is_erm41 {
210 identify_sequence_erm41(trimmed)
211 } else if is_hsp65 {
212 identify_sequence_hsp65(trimmed)
213 } else if is_rpob {
214 identify_sequence_rpob(trimmed)
215 } else if is_23s_ntm {
216 identify_sequence_rrl_ntm(trimmed)
217 } else if is_16s {
218 identify_sequence_16s(trimmed)
219 } else if is_pnca {
220 identify_sequence_pnca(trimmed)
221 } else {
222 Vec::new()
223 }
224 } else {
225 Vec::new()
226 };
227
228 log::debug!("ab1_scan: alignment done for {} → {} hits (top: {:?}, canonical={})", path.display(), seq_id_hits.len(), seq_id_hits.first().map(|h| (&h.description, h.identity)), canonical_path.display());
230 if let Ok(mut guard) = AB1_SEQ_CACHE.write() {
231 guard.insert(canonical_path, seq_id_hits.clone());
232 }
233
234 let is_susceptible = seq_id_hits.first().and_then(|hit| {
235 let erm41_result = if hit.description == DESC_MASSILIENSE {
236 Some(true)
237 } else {
238 is_susceptible_erm41(hit.erm41_position_28_opt.as_ref(), &hit.erm41_snp_calls)
239 };
240 if erm41_result.is_some() {
241 return erm41_result;
242 }
243 let rrl_result = is_susceptible_rrl(hit.rrl_position_2058_2059_opt.as_ref(), &hit.rrl_snp_calls);
244 if rrl_result.is_some() {
245 return rrl_result;
246 }
247 let rrs_result = is_susceptible_rrs(&hit.rrs_snp_calls);
248 if rrs_result.is_some() {
249 return rrs_result;
250 }
251 is_susceptible_pnca(&hit.pnca_snp_calls)
252 });
253
254 let susceptibility_calls = seq_id_hits
255 .first()
256 .map(|hit| SusceptibilityCalls {
257 erm41: Erm41SusceptibilityCalls {
258 position_28: hit.erm41_position_28_opt,
259 lof_snp_calls: hit.erm41_snp_calls.clone(),
260 is_susceptible: if hit.description == DESC_MASSILIENSE {
261 Some(true)
262 } else {
263 is_susceptible_erm41(hit.erm41_position_28_opt.as_ref(), &hit.erm41_snp_calls)
264 },
265 },
266 rrl: RrlSusceptibilityCalls {
267 position_2058_2059: hit.rrl_position_2058_2059_opt,
268 snp_calls: hit.rrl_snp_calls.clone(),
269 is_susceptible: is_susceptible_rrl(hit.rrl_position_2058_2059_opt.as_ref(), &hit.rrl_snp_calls),
270 is_susceptible_rare: is_susceptible_rrl_by_snp_calls_rare(hit.rrl_position_2058_2059_opt.as_ref(), &hit.rrl_snp_calls),
271 },
272 rrs: RrsSusceptibilityCalls {
273 snp_calls: hit.rrs_snp_calls.clone(),
274 is_susceptible: is_susceptible_rrs(&hit.rrs_snp_calls),
275 is_susceptible_rare: is_susceptible_rrs_by_snp_calls_rare(&hit.rrs_snp_calls),
276 },
277 pnca: PncaSusceptibilityCalls {
278 snp_calls: hit.pnca_snp_calls.clone(),
279 is_susceptible: is_susceptible_pnca(&hit.pnca_snp_calls),
280 },
281 })
282 .unwrap_or_default();
283
284 let record = SampleSusceptibilityRecord {
285 sample_id,
286 gene,
287 file_name,
288 file_path: path.to_path_buf(),
289 file_created,
290 susceptibility_calls,
291 species: seq_id_hits.first().map(|h| h.description.clone()),
292 identity: seq_id_hits.first().map(|h| h.identity),
293 is_susceptible,
294 seq_id_hits: seq_id_hits.iter().take(5).cloned().collect(),
295 };
296
297 disk_cache.insert(path_key, (mtime_secs, record.clone()));
298 cache_dirty = true;
299 records.push(record);
300 }
301
302 if cache_dirty {
304 if let Some(cp) = &cache_path {
305 match serde_json::to_string(&disk_cache) {
306 Ok(json) => {
307 if let Err(e) = std::fs::write(cp, json) {
308 log::warn!("ab1 scan: failed to write disk cache {}: {e}", cp.display());
309 }
310 }
311 Err(e) => log::warn!("ab1 scan: failed to serialise disk cache: {e}"),
312 }
313 }
314 }
315
316 let mem_cache_size = AB1_SEQ_CACHE.read().map(|g| g.len()).unwrap_or(0);
317 log::debug!("ab1_scan: finished: {} records, AB1_SEQ_CACHE has {} entries", records.len(), mem_cache_size);
318
319 records.sort_by(|a, b| b.sample_id.cmp(&a.sample_id));
321 records
322}
323
324pub fn write_ab1_csv(
330 records: &[SampleSusceptibilityRecord],
331 out_path: &std::path::Path,
332) -> Result<(), Box<dyn std::error::Error>> {
333 let file = std::fs::File::create(out_path)?;
334 let mut wtr = csv::Writer::from_writer(file);
335
336 wtr.write_record([
337 "file_name",
338 "sample_id",
339 "gene",
340 "overall_susceptible",
341 "species",
342 "identity_pct",
343 "erm41_position_28",
344 "erm41_lof_snp_calls",
345 "erm41_susceptible",
346 "rrl_position_2058_2059",
347 "rrl_snp_calls",
348 "rrl_susceptible",
349 "rrs_snp_calls",
350 "rrs_susceptible",
351 "pnca_snp_calls",
352 "pnca_susceptible",
353 "file_created",
354 ])?;
355
356 for rec in records {
357 if rec.gene.is_none() {
358 continue;
359 }
360 if rec.identity.is_none_or(|i| i < MIN_SEQ_ID_IDENTITY) {
361 continue;
362 }
363
364 let file_created = rec
365 .file_created
366 .map(system_time_to_iso8601)
367 .unwrap_or_default();
368
369 let erm41_pos = rec
370 .susceptibility_calls
371 .erm41
372 .position_28
373 .map(|p| p.to_string())
374 .unwrap_or_default();
375 let erm41_sus = fmt_susceptible(rec.susceptibility_calls.erm41.is_susceptible);
376 let erm41_lof = snp_calls_str(
377 rec.susceptibility_calls
378 .erm41
379 .lof_snp_calls
380 .iter()
381 .map(|s| (s.ref_pos, s.call_tag())),
382 );
383
384 let rrl_pos = rec
385 .susceptibility_calls
386 .rrl
387 .position_2058_2059
388 .map(|p| p.to_string())
389 .unwrap_or_default();
390 let rrl_sus = fmt_susceptible(rec.susceptibility_calls.rrl.is_susceptible);
391 let rrl_snps = snp_calls_str(
392 rec.susceptibility_calls
393 .rrl
394 .snp_calls
395 .iter()
396 .map(|s| (s.ref_pos, s.call_tag())),
397 );
398
399 let rrs_sus = fmt_susceptible(rec.susceptibility_calls.rrs.is_susceptible);
400 let rrs_snps = snp_calls_str(
401 rec.susceptibility_calls
402 .rrs
403 .snp_calls
404 .iter()
405 .map(|s| (s.ref_pos, s.call_tag())),
406 );
407
408 let pnca_sus = fmt_susceptible(rec.susceptibility_calls.pnca.is_susceptible);
409 let pnca_snps = pnca_snp_calls_str(&rec.susceptibility_calls.pnca.snp_calls);
410
411 let overall = fmt_susceptible(rec.is_susceptible);
412
413 wtr.write_record([
414 rec.file_name.as_str(),
415 rec.sample_id.as_str(),
416 rec.gene.as_deref().unwrap_or(""),
417 overall.as_str(),
418 rec.species.as_deref().unwrap_or(""),
419 &rec.identity.map(|i| format!("{:.1}", i)).unwrap_or_default(),
420 erm41_pos.as_str(),
421 erm41_lof.as_str(),
422 erm41_sus.as_str(),
423 rrl_pos.as_str(),
424 rrl_snps.as_str(),
425 rrl_sus.as_str(),
426 rrs_snps.as_str(),
427 rrs_sus.as_str(),
428 pnca_snps.as_str(),
429 pnca_sus.as_str(),
430 file_created.as_str(),
431 ])?;
432 }
433
434 wtr.flush()?;
435 Ok(())
436}
437
438pub fn write_rare_mutations_csv(
442 records: &[SampleSusceptibilityRecord],
443 out_path: &std::path::Path,
444) -> Result<(), Box<dyn std::error::Error>> {
445 let rare: Vec<&SampleSusceptibilityRecord> = records
446 .iter()
447 .filter(|rec| {
448 rec.gene.is_some()
449 && rec.identity.is_some_and(|i| i >= MIN_SEQ_ID_IDENTITY)
450 && (rec.susceptibility_calls.rrl.is_susceptible_rare == Some(false)
451 || rec.susceptibility_calls.rrs.is_susceptible_rare == Some(false))
452 })
453 .collect();
454
455 if rare.is_empty() {
456 return Ok(());
457 }
458
459 let file = std::fs::File::create(out_path)?;
460 let mut wtr = csv::Writer::from_writer(file);
461
462 wtr.write_record([
463 "file_name",
464 "sample_id",
465 "gene",
466 "overall_susceptible",
467 "species",
468 "identity_pct",
469 "erm41_position_28",
470 "erm41_lof_snp_calls",
471 "erm41_susceptible",
472 "rrl_position_2058_2059",
473 "rrl_snp_calls",
474 "rrl_susceptible",
475 "rrl_susceptible_rare",
476 "rrs_snp_calls",
477 "rrs_susceptible",
478 "rrs_susceptible_rare",
479 "file_created",
480 ])?;
481
482 for rec in &rare {
483 let rrl_rare = rec.susceptibility_calls.rrl.is_susceptible_rare;
484 let rrs_rare = rec.susceptibility_calls.rrs.is_susceptible_rare;
485
486 let file_created = rec
487 .file_created
488 .map(system_time_to_iso8601)
489 .unwrap_or_default();
490
491 let erm41_pos = rec
492 .susceptibility_calls
493 .erm41
494 .position_28
495 .map(|p| p.to_string())
496 .unwrap_or_default();
497 let erm41_sus = fmt_susceptible(rec.susceptibility_calls.erm41.is_susceptible);
498 let erm41_lof = snp_calls_str(
499 rec.susceptibility_calls
500 .erm41
501 .lof_snp_calls
502 .iter()
503 .map(|s| (s.ref_pos, s.call_tag())),
504 );
505
506 let rrl_pos = rec
507 .susceptibility_calls
508 .rrl
509 .position_2058_2059
510 .map(|p| p.to_string())
511 .unwrap_or_default();
512 let rrl_sus = fmt_susceptible(rec.susceptibility_calls.rrl.is_susceptible);
513 let rrl_sus_rare = fmt_susceptible(rrl_rare);
514 let rrl_snps = snp_calls_str(
515 rec.susceptibility_calls
516 .rrl
517 .snp_calls
518 .iter()
519 .map(|s| (s.ref_pos, s.call_tag())),
520 );
521
522 let rrs_sus = fmt_susceptible(rec.susceptibility_calls.rrs.is_susceptible);
523 let rrs_sus_rare = fmt_susceptible(rrs_rare);
524 let rrs_snps = snp_calls_str(
525 rec.susceptibility_calls
526 .rrs
527 .snp_calls
528 .iter()
529 .map(|s| (s.ref_pos, s.call_tag())),
530 );
531
532 let overall = fmt_susceptible(rec.is_susceptible);
533
534 wtr.write_record([
535 rec.file_name.as_str(),
536 rec.sample_id.as_str(),
537 rec.gene.as_deref().unwrap_or(""),
538 overall.as_str(),
539 rec.species.as_deref().unwrap_or(""),
540 &rec.identity.map(|i| format!("{:.1}", i)).unwrap_or_default(),
541 erm41_pos.as_str(),
542 erm41_lof.as_str(),
543 erm41_sus.as_str(),
544 rrl_pos.as_str(),
545 rrl_snps.as_str(),
546 rrl_sus.as_str(),
547 rrl_sus_rare.as_str(),
548 rrs_snps.as_str(),
549 rrs_sus.as_str(),
550 rrs_sus_rare.as_str(),
551 file_created.as_str(),
552 ])?;
553 }
554
555 wtr.flush()?;
556 Ok(())
557}
558
559fn fmt_susceptible(v: Option<bool>) -> String {
560 match v {
561 Some(true) => "susceptible".to_string(),
562 Some(false) => "resistant".to_string(),
563 None => String::new(),
564 }
565}
566
567fn snp_calls_str(calls: impl Iterator<Item = (usize, String)>) -> String {
568 calls
569 .map(|(pos, tag)| format!("pos {}: {}", pos + 1, tag))
570 .collect::<Vec<_>>()
571 .join("; ")
572}
573
574fn pnca_snp_calls_str(calls: &[super::pnca::PncaSnpCall]) -> String {
575 calls
576 .iter()
577 .filter(|c| !c.call_tag().is_empty())
578 .map(|c| format!("{}: {}", c.site_label(), c.call_tag()))
579 .collect::<Vec<_>>()
580 .join("; ")
581}
582
583fn system_time_to_iso8601(t: std::time::SystemTime) -> String {
585 use std::time::UNIX_EPOCH;
586 let Ok(dur) = t.duration_since(UNIX_EPOCH) else {
587 return String::new();
588 };
589 let secs = dur.as_secs();
590
591 let s = secs % 60;
592 let m = (secs / 60) % 60;
593 let h = (secs / 3600) % 24;
594 let days = (secs / 86400) as u32;
595
596 let (year, month, day) = days_to_ymd(days);
598
599 format!("{year:04}-{month:02}-{day:02}T{h:02}:{m:02}:{s:02}Z")
600}
601
602fn days_to_ymd(days: u32) -> (u32, u32, u32) {
603 let jdn = days + 2_440_588;
606 let a = jdn + 32044;
607 let b = (4 * a + 3) / 146097;
608 let c = a - (146097 * b) / 4;
609 let d = (4 * c + 3) / 1461;
610 let e = c - (1461 * d) / 4;
611 let m = (5 * e + 2) / 153;
612 let day = e - (153 * m + 2) / 5 + 1;
613 let month = m + 3 - 12 * (m / 10);
614 let year = 100 * b + d - 4800 + m / 10;
615 (year, month, day)
616}
617
618#[cfg(test)]
619mod tests {
620 use super::*;
621
622 #[test]
623 fn test_parse_ab1_filename() {
624 let (id, gene) = parse_ab1_filename("2026311072 rrl R 2.4.26_MCLR 21R.ab1");
625 assert_eq!(id, "2026311072");
626 assert_eq!(gene.as_deref(), Some("rrl"));
627
628 let (id, gene) = parse_ab1_filename("12345.ab1");
629 assert_eq!(id, "12345");
630 assert_eq!(gene, None);
631 }
632
633 #[test]
634 fn test_system_time_to_iso8601() {
635 use std::time::{Duration, UNIX_EPOCH};
636 let t = UNIX_EPOCH + Duration::from_secs(1_705_276_800);
638 assert_eq!(system_time_to_iso8601(t), "2024-01-15T00:00:00Z");
639 }
640}