From 0be68f88aa3c401aa5d5728bdf83421e43e66165 Mon Sep 17 00:00:00 2001 From: Samuel Aroney Date: Wed, 17 Dec 2025 15:27:08 +1000 Subject: [PATCH] implement --low-memory arg for cluster/process uses skani sketch to file and search to avoid having whole db in memory --- docs/preludes/cluster_prelude.md | 2 + src/cluster_argument_parsing.rs | 22 ++++ src/clusterer.rs | 42 ++++++++ src/finch.rs | 17 ++-- src/process_argument_parsing.rs | 10 ++ src/skani.rs | 169 +++++++++++++++++++++++++++++-- tests/test_cmdline.rs | 31 ++++++ tests/test_process.rs | 58 +++++++++++ 8 files changed, 338 insertions(+), 13 deletions(-) diff --git a/docs/preludes/cluster_prelude.md b/docs/preludes/cluster_prelude.md index 4e7ab08..ba6c270 100644 --- a/docs/preludes/cluster_prelude.md +++ b/docs/preludes/cluster_prelude.md @@ -13,6 +13,8 @@ galah cluster --ani 95 --precluster-ani 90 --precluster-method finch --genome-fa # Example: cluster a set of genomes and then their representatives against a set of reference genomes (reduces memory usage against clustering all together) galah cluster --genome-fasta-directory input_genomes/ --output-representative-list genome_reps.txt galah cluster --genome-fasta-list genome_reps.txt --reference-genomes-list reference_genomes.txt --output-cluster-definition clusters.tsv +# Example: cluster a large set of genomes using low-memory mode +galah cluster --low-memory --genome-fasta-directory input_genomes/ --output-representative-fasta-directory output_directory/ ``` ### Precluster ANI diff --git a/src/cluster_argument_parsing.rs b/src/cluster_argument_parsing.rs index 75c93d1..087edad 100644 --- a/src/cluster_argument_parsing.rs +++ b/src/cluster_argument_parsing.rs @@ -128,6 +128,7 @@ pub struct GalahClustererCommandDefinition { pub dereplication_large_contigs_argument: String, pub dereplication_fraglen_argument: String, pub dereplication_cluster_contigs_argument: String, + pub dereplication_low_memory_argument: String, pub dereplication_reference_genomes_argument: String, pub dereplication_reference_genomes_list_argument: String, // pub dereplication_ani_method_argument: String, @@ -153,6 +154,7 @@ lazy_static! { dereplication_large_contigs_argument: "large-contigs".to_string(), dereplication_fraglen_argument: "fragment-length".to_string(), dereplication_cluster_contigs_argument: "cluster-contigs".to_string(), + dereplication_low_memory_argument: "low-memory".to_string(), dereplication_reference_genomes_argument: "reference-genomes".to_string(), dereplication_reference_genomes_list_argument: "reference-genomes-list".to_string(), // dereplication_ani_method_argument: "ani-method".to_string(), @@ -430,6 +432,14 @@ pub fn add_dereplication_clustering_parameters_to_section( .help("Do not use small-genomes settings in skani when clustering contigs. \ Recommended for contigs >= 20kb. Mutually exclusive with --small-contigs."), ) + .flag( + Flag::new() + .long(&format!( + "--{}", + definition.dereplication_low_memory_argument + )) + .help("Reduce memory use by sketching to file and searching it instead."), + ) .option( Opt::new("PATH ...") .long(&format!( @@ -1279,6 +1289,8 @@ pub fn generate_galah_clusterer<'a>( }), num_kmers: 1000, kmer_length: 21, + low_memory: clap_matches + .get_flag(&argument_definition.dereplication_low_memory_argument), }), "skani" => Preclusterer::Skani(SkaniPreclusterer { threshold: { @@ -1350,6 +1362,8 @@ pub fn generate_galah_clusterer<'a>( }), small_genomes, threads, + low_memory: clap_matches + .get_flag(&argument_definition.dereplication_low_memory_argument), }), _ => panic!("Programming error"), }, @@ -1663,15 +1677,23 @@ pub fn add_cluster_subcommand(app: clap::Command) -> clap::Command { .action(clap::ArgAction::SetTrue) .requires(&*GALAH_COMMAND_DEFINITION.dereplication_cluster_contigs_argument) .conflicts_with(&*GALAH_COMMAND_DEFINITION.dereplication_small_contigs_argument)) + .arg(Arg::new(&*GALAH_COMMAND_DEFINITION.dereplication_low_memory_argument) + .long(&*GALAH_COMMAND_DEFINITION.dereplication_low_memory_argument) + .help("Reduce memory by sketching all genomes and searching instead of triangle") + .action(clap::ArgAction::SetTrue) + .conflicts_with(&*GALAH_COMMAND_DEFINITION.dereplication_reference_genomes_argument) + .conflicts_with(&*GALAH_COMMAND_DEFINITION.dereplication_reference_genomes_list_argument)) .arg(Arg::new(&*GALAH_COMMAND_DEFINITION.dereplication_reference_genomes_argument) .long("reference-genomes") .help("Reference genomes to cluster against. These should be representatives already clustered. Galah will only form clusters across the two groups, never within. Uses less memory than clustering together.") .value_delimiter(' ') .num_args(1..) + .conflicts_with(&*GALAH_COMMAND_DEFINITION.dereplication_low_memory_argument) .conflicts_with(&*GALAH_COMMAND_DEFINITION.dereplication_reference_genomes_list_argument)) .arg(Arg::new(&*GALAH_COMMAND_DEFINITION.dereplication_reference_genomes_list_argument) .long("reference-genomes-list") .help("File containing paths to reference genomes (one per line). These should be representatives already clustered. Galah will only form clusters across the two groups, never within. Uses less memory than clustering together.") + .conflicts_with(&*GALAH_COMMAND_DEFINITION.dereplication_low_memory_argument) .conflicts_with(&*GALAH_COMMAND_DEFINITION.dereplication_reference_genomes_argument)) .arg(Arg::new("threads") .short('t') diff --git a/src/clusterer.rs b/src/clusterer.rs index b9430e4..34ebc75 100644 --- a/src/clusterer.rs +++ b/src/clusterer.rs @@ -548,6 +548,7 @@ mod tests { min_ani: 0.9, num_kmers: 1000, kmer_length: 21, + low_memory: false, }, &crate::fastani::FastaniClusterer { threshold: 95.0, @@ -578,6 +579,7 @@ mod tests { min_ani: 0.9, num_kmers: 1000, kmer_length: 21, + low_memory: false, }, &crate::fastani::FastaniClusterer { threshold: 98.0, @@ -608,6 +610,7 @@ mod tests { min_ani: 0.9, num_kmers: 1000, kmer_length: 21, + low_memory: false, }, &crate::fastani::FastaniClusterer { threshold: 98.0, @@ -638,6 +641,7 @@ mod tests { min_ani: 0.9, num_kmers: 1000, kmer_length: 21, + low_memory: false, }, &crate::skani::SkaniClusterer { threshold: 95.0, @@ -668,6 +672,7 @@ mod tests { min_ani: 0.9, num_kmers: 1000, kmer_length: 21, + low_memory: false, }, &crate::skani::SkaniClusterer { threshold: 99.0, @@ -699,6 +704,7 @@ mod tests { min_aligned_threshold: 0.2, small_genomes: false, threads: 1, + low_memory: false, }, &crate::skani::SkaniClusterer { threshold: 99.0, @@ -731,6 +737,41 @@ mod tests { min_aligned_threshold: 0.2, small_genomes: false, threads: 1, + low_memory: false, + }, + &crate::skani::SkaniClusterer { + threshold: 99.0, + min_aligned_threshold: 0.2, + small_genomes: false, + }, + false, + None, + None, + ); + for cluster in clusters.iter_mut() { + cluster.sort_unstable(); + } + clusters.sort_unstable(); + assert_eq!(vec![vec![0, 1, 3], vec![2], vec![4]], clusters) + } + + #[test] + fn test_skani_skani_low_memory() { + init(); + let mut clusters = cluster( + &[ + "tests/data/abisko4/73.20120800_S1X.13.fna", + "tests/data/abisko4/73.20120600_S2D.19.fna", + "tests/data/abisko4/73.20120700_S3X.12.fna", + "tests/data/abisko4/73.20110800_S2D.13.fna", + "tests/data/antonio_mags/BE_RX_R2_MAG52.fna", + ], + &crate::skani::SkaniPreclusterer { + threshold: 90.0, + min_aligned_threshold: 0.2, + small_genomes: false, + threads: 1, + low_memory: true, }, &crate::skani::SkaniClusterer { threshold: 99.0, @@ -758,6 +799,7 @@ mod tests { min_aligned_threshold: 0.2, small_genomes: false, threads: 1, + low_memory: false, }, &crate::skani::SkaniClusterer { threshold: 99.0, diff --git a/src/finch.rs b/src/finch.rs index 663d2da..8997840 100644 --- a/src/finch.rs +++ b/src/finch.rs @@ -6,16 +6,21 @@ pub struct FinchPreclusterer { pub min_ani: f32, pub num_kmers: usize, pub kmer_length: u8, + pub low_memory: bool, } impl PreclusterDistanceFinder for FinchPreclusterer { fn distances(&self, genome_fasta_paths: &[&str]) -> SortedPairGenomeDistanceCache { - distances( - genome_fasta_paths, - self.min_ani, - self.num_kmers, - self.kmer_length, - ) + if self.low_memory { + panic!("Low-memory clustering currently only supported with skani preclusterer"); + } else { + distances( + genome_fasta_paths, + self.min_ani, + self.num_kmers, + self.kmer_length, + ) + } } fn distances_contigs( diff --git a/src/process_argument_parsing.rs b/src/process_argument_parsing.rs index 80def9f..9400a62 100644 --- a/src/process_argument_parsing.rs +++ b/src/process_argument_parsing.rs @@ -26,6 +26,7 @@ lazy_static! { dereplication_small_contigs_argument: "small-contigs".to_string(), dereplication_large_contigs_argument: "large-contigs".to_string(), dereplication_fraglen_argument: "fragment-length".to_string(), + dereplication_low_memory_argument: "low-memory".to_string(), dereplication_reference_genomes_argument: "reference-genomes".to_string(), dereplication_reference_genomes_list_argument: "reference-genomes-list".to_string(), dereplication_output_cluster_definition_file: "output-cluster-definition" @@ -203,15 +204,23 @@ pub fn add_process_subcommand(app: clap::Command) -> clap::Command { .action(clap::ArgAction::SetTrue) .requires(&*PROCESS_CLUSTER_COMMAND_DEFINITION.dereplication_cluster_contigs_argument) .conflicts_with(&*PROCESS_CLUSTER_COMMAND_DEFINITION.dereplication_small_contigs_argument)) + .arg(Arg::new(&*PROCESS_CLUSTER_COMMAND_DEFINITION.dereplication_low_memory_argument) + .long(&*PROCESS_CLUSTER_COMMAND_DEFINITION.dereplication_low_memory_argument) + .help("Reduce memory by sketching all genomes and searching instead of triangle") + .action(clap::ArgAction::SetTrue) + .conflicts_with(&*PROCESS_CLUSTER_COMMAND_DEFINITION.dereplication_reference_genomes_argument) + .conflicts_with(&*PROCESS_CLUSTER_COMMAND_DEFINITION.dereplication_reference_genomes_list_argument)) .arg(Arg::new(&*PROCESS_CLUSTER_COMMAND_DEFINITION.dereplication_reference_genomes_argument) .long("reference-genomes") .help("Reference genomes to cluster against. These should be representatives already clustered. Galah will only form clusters across the two groups, never within. Uses less memory than clustering together.") .value_delimiter(' ') .num_args(1..) + .conflicts_with(&*PROCESS_CLUSTER_COMMAND_DEFINITION.dereplication_low_memory_argument) .conflicts_with(&*PROCESS_CLUSTER_COMMAND_DEFINITION.dereplication_reference_genomes_list_argument)) .arg(Arg::new(&*PROCESS_CLUSTER_COMMAND_DEFINITION.dereplication_reference_genomes_list_argument) .long("reference-genomes-list") .help("File containing paths to reference genomes (one per line). These should be representatives already clustered. Galah will only form clusters across the two groups, never within. Uses less memory than clustering together.") + .conflicts_with(&*PROCESS_CLUSTER_COMMAND_DEFINITION.dereplication_low_memory_argument) .conflicts_with(&*PROCESS_CLUSTER_COMMAND_DEFINITION.dereplication_reference_genomes_argument)) .arg(Arg::new("threads") .short('t') @@ -385,6 +394,7 @@ pub fn process_full_help(program_basename: &str, program_version: &str) -> Manua dereplication_large_contigs_argument: "large-contigs".to_string(), dereplication_fraglen_argument: "fragment-length".to_string(), dereplication_cluster_contigs_argument: "cluster-contigs".to_string(), + dereplication_low_memory_argument: "low-memory".to_string(), dereplication_reference_genomes_argument: "reference-genomes".to_string(), dereplication_reference_genomes_list_argument: "reference-genomes-list".to_string(), dereplication_output_cluster_definition_file: "output-cluster-definition".to_string(), diff --git a/src/skani.rs b/src/skani.rs index 6117bfb..49d1dce 100644 --- a/src/skani.rs +++ b/src/skani.rs @@ -14,17 +14,28 @@ pub struct SkaniPreclusterer { pub min_aligned_threshold: f32, pub small_genomes: bool, pub threads: u16, + pub low_memory: bool, } impl PreclusterDistanceFinder for SkaniPreclusterer { fn distances(&self, genome_fasta_paths: &[&str]) -> SortedPairGenomeDistanceCache { - precluster_skani( - genome_fasta_paths, - self.threshold, - self.min_aligned_threshold, - self.small_genomes, - self.threads, - ) + if self.low_memory { + precluster_skani_lowmem( + genome_fasta_paths, + self.threshold, + self.min_aligned_threshold, + self.small_genomes, + self.threads, + ) + } else { + precluster_skani( + genome_fasta_paths, + self.threshold, + self.min_aligned_threshold, + self.small_genomes, + self.threads, + ) + } } fn distances_contigs( @@ -172,6 +183,150 @@ fn precluster_skani( distances } +/// Create preclusters using skani in low-memory mode by sketching all genomes +/// then searching all genomes against the sketch database. +fn precluster_skani_lowmem( + genome_fasta_paths: &[&str], + threshold: f32, + min_aligned_threshold: f32, + small_genomes: bool, + threads: u16, +) -> SortedPairGenomeDistanceCache { + if threshold < 85.0 { + panic!( + "Error: skani produces inaccurate results with ANI less than 85%. Provided: {}", + threshold + ); + } + + if small_genomes { + panic!("Error: skani does not support small genomes with low-memory preclustering"); + } + + // Create a tempfile to list all the fasta file paths + let mut tf = tempfile::Builder::new() + .prefix("galah-input-genomes") + .suffix(".txt") + .tempfile() + .expect("Failed to open temporary file to run skani"); + + for fasta in genome_fasta_paths { + writeln!(tf, "{fasta}").expect("Failed to write genome fasta paths to tempfile for skani"); + } + + // Create a tempdir to store all genome sketches + let db_dir = + tempfile::TempDir::new().expect("Failed to create temporary directory for skani sketches"); + + info!("Running skani to sketch genomes for low-memory mode .."); + let mut cmd_sketch = std::process::Command::new("skani"); + cmd_sketch + .arg("sketch") + .arg("-t") + .arg(format!("{threads}")) + .arg("-l") + .arg(tf.path().to_str().unwrap()) + .arg("-o") + .arg(db_dir.path().join("galah-skani").to_str().unwrap()) + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::null()); + debug!("Running skani command: {:?}", &cmd_sketch); + + let mut process_sketch = cmd_sketch + .spawn() + .unwrap_or_else(|_| panic!("Failed to spawn {}", "skani")); + + // Wait for sketching to complete and check the result + process_sketch + .wait() + .expect("Failed to wait for skani sketch"); + + info!("Running skani search to get distances .."); + let mut cmd = std::process::Command::new("skani"); + cmd.arg("search") + .arg("-t") + .arg(format!("{threads}")) + .arg("--min-af") + .arg(format!("{}", min_aligned_threshold * 100.0)) + .arg("--ql") + .arg(tf.path().to_str().unwrap()) + .arg("-d") + .arg(db_dir.path().join("galah-skani").to_str().unwrap()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::null()); + debug!("Running skani command: {:?}", &cmd); + + // Parse the distances + // Ref_file Query_file ANI Align_fraction_ref Align_fraction_query Ref_name Query_name + let mut process = cmd + .spawn() + .unwrap_or_else(|_| panic!("Failed to spawn {}", "skani")); + let stdout = process.stdout.as_mut().unwrap(); + let stdout_reader = BufReader::new(stdout); + + let mut rdr = csv::ReaderBuilder::new() + .delimiter(b'\t') + .has_headers(true) + .from_reader(stdout_reader); + + let mut distances = SortedPairGenomeDistanceCache::new(); + + // Order is likely not conserved, so need to keep track + for record_res in rdr.records() { + match record_res { + Ok(record) => { + debug!("Found skani record {:?}", record); + + if &record[0] == &record[1] { + // Ignore self matches + continue; + } + + // Get index of genomes within combined_genomes + let genome_id1 = genome_fasta_paths + .iter() + .position(|&x| x == &record[0]) + .unwrap_or_else(|| { + panic!( + "Failed to find genome fasta path in genome_fasta_paths: {}", + &record[0] + ) + }); + let genome_id2 = genome_fasta_paths + .iter() + .position(|&x| x == &record[1]) + .unwrap_or_else(|| { + panic!( + "Failed to find genome fasta path in genome_fasta_paths: {}", + &record[1] + ) + }); + + let ani: f32 = record[2].parse().unwrap_or_else(|_| { + panic!("Failed to convert skani ANI to float value: {}", &record[2]) + }); + trace!("Found ANI {}", ani); + if ani >= threshold { + trace!("Accepting ANI since it passed threshold"); + distances.insert((genome_id1, genome_id2), Some(ani)) + } + } + Err(e) => { + error!("Error parsing skani output: {}", e); + std::process::exit(1); + } + } + } + + finish_command_safely(process, "skani") + .wait() + .expect("Unexpected wait failure outside bird_tool_utils for skani"); + debug!("Found skani distances (low-memory): {:#?}", distances); + info!("Finished skani low-memory search."); + + distances +} + fn precluster_skani_contigs( genome_fasta_paths: &[&str], threshold: f32, diff --git a/tests/test_cmdline.rs b/tests/test_cmdline.rs index d2a20fc..cb59845 100644 --- a/tests/test_cmdline.rs +++ b/tests/test_cmdline.rs @@ -629,6 +629,37 @@ mod tests { // .unwrap(); // } + #[test] + fn test_clustering_low_memory() { + Assert::main_binary() + .with_args(&[ + "cluster", + "--genome-fasta-files", + "tests/data/set1/1mbp.fna", + "tests/data/set1/500kb.fna", + "tests/data/abisko4/73.20120600_S2D.19.fna", + "tests/data/abisko4/73.20120800_S1X.13.fna", + "--low-memory", + "--precluster-method", + "skani", + "--cluster-method", + "skani", + "--precluster-ani", + "90", + "--ani", + "95", + "--output-cluster-definition", + "/dev/stdout"]) + .succeeds() + .stdout() + .is("\ + tests/data/set1/1mbp.fna tests/data/set1/1mbp.fna\n\ + tests/data/set1/1mbp.fna tests/data/set1/500kb.fna\n\ + tests/data/abisko4/73.20120600_S2D.19.fna tests/data/abisko4/73.20120600_S2D.19.fna\n\ + tests/data/abisko4/73.20120600_S2D.19.fna tests/data/abisko4/73.20120800_S1X.13.fna\n") + .unwrap(); + } + #[test] fn test_reference_genomes_argument() { Assert::main_binary() diff --git a/tests/test_process.rs b/tests/test_process.rs index b31d7f7..174a9ae 100644 --- a/tests/test_process.rs +++ b/tests/test_process.rs @@ -213,6 +213,64 @@ mod tests { assert!(output_quality.exists()); } + #[test] + fn test_process_mock_low_memory() { + let tmpdir = tempdir().unwrap(); + let output_mimag = tmpdir.path().join("mimag_summary.tsv"); + let output_quality = tmpdir.path().join("quality_report.tsv"); + + setup_mock_bin( + tmpdir.path(), + &[ + (String::from("73.20120800_S1D.21"), 95.0, 2.0, 1, 1, 1, 20), + (String::from("73.20110800_S2M.16"), 90.0, 5.0, 1, 1, 1, 20), + (String::from("1mbp"), 85.0, 3.0, 1, 1, 1, 15), + (String::from("500kb"), 80.0, 4.0, 0, 1, 0, 10), + ], + ); + let path = env::var("PATH").unwrap(); + let new_path = format!("{}:{}", tmpdir.path().display(), path); + + Assert::main_binary() + .with_env(&[("PATH", &new_path)]) + .with_args(&[ + "process", + "--genome-fasta-files", + "tests/data/set1/1mbp.fna", + "tests/data/set1/500kb.fna", + "tests/data/abisko4/73.20120800_S1D.21.fna", + "tests/data/abisko4/73.20110800_S2M.16.fna", + "--low-memory", + "--output-cluster-definition", + "/dev/stdout", + "--output-mimag-summary", + output_mimag.to_str().unwrap(), + "--output-quality-report", + output_quality.to_str().unwrap(), + ]) + .succeeds() + .stdout() + .is("\ + tests/data/abisko4/73.20120800_S1D.21.fna\ttests/data/abisko4/73.20120800_S1D.21.fna\n\ + tests/data/abisko4/73.20120800_S1D.21.fna\ttests/data/abisko4/73.20110800_S2M.16.fna\n\ + tests/data/set1/1mbp.fna\ttests/data/set1/1mbp.fna\n\ + tests/data/set1/1mbp.fna\ttests/data/set1/500kb.fna\n") + .unwrap(); + + // Verify analyse outputs were created properly + assert!(output_mimag.exists()); + let content = fs::read_to_string(&output_mimag).unwrap(); + let expected = "\ + genome\tcompleteness\tcontamination\trRNA_5S\trRNA_16S\trRNA_23S\ttRNAs\tMIMAG_quality\n\ + tests/data/set1/1mbp.fna\t85.00\t3.00\t1\t1\t1\t15\tMedium quality\n\ + tests/data/set1/500kb.fna\t80.00\t4.00\t0\t1\t0\t10\tMedium quality\n\ + tests/data/abisko4/73.20120800_S1D.21.fna\t95.00\t2.00\t1\t1\t1\t20\tHigh quality\n\ + tests/data/abisko4/73.20110800_S2M.16.fna\t90.00\t5.00\t1\t1\t1\t20\tMedium quality\n"; + assert_eq!(content, expected); + + assert!(output_quality.exists()); + } + #[test] fn test_process_mock_invert() { let tmpdir = tempdir().unwrap();