Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 74 additions & 35 deletions src/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use datafusion::{
common::JoinType,
functions_aggregate::expr_fn::first_value,
prelude::{
DataFrame, cast, col, concat, concat_ws, encode, left, length, lit, nullif, sha256, to_hex,
cast, col, concat, concat_ws, encode, left, length, lit, nullif, sha256, to_hex, DataFrame, SessionContext
},
};
use noodles::{
Expand Down Expand Up @@ -47,6 +47,15 @@ mod flip;
mod report;
mod union;

enum MergeType {
Exact,
Locus,
ExactBreakend,
Approx,
Near,
Flip,
}

pub async fn merge_vcfs(
out: &str,
vcf: &Vec<String>,
Expand Down Expand Up @@ -85,46 +94,30 @@ pub async fn merge_vcfs(
acc = Some(df)
}
}
let orig = acc.unwrap();
let orig = orig
let results = acc.unwrap();
let mut results = results
.with_column("row_key", cast(col("row_id"), DataType::UInt32))?
.with_column("vix_count", lit(1))?
.with_column("vix_set", col("vix"))?
.with_column("flip", lit(false))?
.with_column("criteria", nullif(lit(""), lit("")))?;

let mut results = orig.clone();

if true {
log::info!("looking for exact matches on indel type variants");
let join = full_exact_indel_join(results.clone(), n)?;
results = merge_with(results, join, &ctx, "exact").await?;
}
if true {
log::info!("looking for almost exact matches on insertions");
let join = full_exact_locus_ins_join(results.clone(), n, &options)?;
results = merge_with(results, join, &ctx, "locus").await?;
}
if true {
log::info!("looking for exact matches on breakends");
let join = full_exact_bnd(results.clone(), n)?;
results = merge_with(results, join, &ctx, "exact").await?;
}
if true {
log::info!("looking for approximate matches on breakends");
let join = approx_bnd_join(results.clone(), n, &options)?;
results = merge_with(results, join, &ctx, "approx").await?;
}
if true {
log::info!("looking for nearby matches on indel type variants");
let join = approx_near_join(results.clone(), n, &options, &ctx).await?;
results = merge_with(results, join, &ctx, "near").await?;
}
if options.allow_breakend_flipping {
log::info!("looking for breakends we can flip");
let join = approx_flipped_bnd_join(results.clone(), n, &options)?;
results = merge_with(results, join, &ctx, "flip").await?;
}
let enabled_merges = vec![
MergeType::Exact,
MergeType::Locus,
MergeType::ExactBreakend,
MergeType::Approx,
MergeType::Near,
MergeType::Flip,
];

results = apply_enabled_merges(
results,
&enabled_merges,
n,
&options,
&ctx,
).await?;

let mut reference = None;
if let Some(reference_filename) = &options.reference {
Expand Down Expand Up @@ -382,6 +375,52 @@ pub async fn merge_vcfs(
Ok(())
}

async fn apply_enabled_merges(
mut results: DataFrame,
enabled_merges: &[MergeType],
n: usize,
options: &MergeOptions,
ctx: &SessionContext,
) -> std::io::Result<DataFrame> {
for m in enabled_merges {
match m {
MergeType::Exact => {
log::info!("looking for exact matches on indel type variants");
let join = full_exact_indel_join(results.clone(), n)?;
results = merge_with(results, join, &ctx, "exact").await?;
}
MergeType::Locus => {
log::info!("looking for almost exact matches on insertions");
let join = full_exact_locus_ins_join(results.clone(), n, &options)?;
results = merge_with(results, join, &ctx, "locus").await?;
}
MergeType::ExactBreakend => {
log::info!("looking for exact matches on breakends");
let join = full_exact_bnd(results.clone(), n)?;
results = merge_with(results, join, &ctx, "exact").await?;
}
MergeType::Approx =>{
log::info!("looking for approximate matches on breakends");
let join = approx_bnd_join(results.clone(), n, &options)?;
results = merge_with(results, join, &ctx, "approx").await?;
}
MergeType::Near => {
log::info!("looking for nearby matches on indel type variants");
let join = approx_near_join(results.clone(), n, &options, &ctx).await?;
results = merge_with(results, join, &ctx, "near").await?;
}
MergeType::Flip => {
if options.allow_breakend_flipping {
log::info!("looking for breakends we can flip");
let join = approx_flipped_bnd_join(results.clone(), n, &options)?;
results = merge_with(results, join, &ctx, "flip").await?;
}
}
}
}
Ok(results)
}

fn load_chroms(path: &str) -> std::io::Result<ChromSet> {
let reader = autocompress::autodetect_open(path)?;
let mut reader: vcf::io::Reader<Box<dyn BufRead>> =
Expand Down