Skip to content
This repository was archived by the owner on Nov 28, 2020. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion performance/bdg_perf/bdg_perf_sequila.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ ss.sql(s"""
|OPTIONS (path "${bedPath}", delimiter "\t")""".stripMargin)

ss.sqlContext.setConf("spark.biodatageeks.bam.predicatePushdown","true")
ss.sqlContext.setConf("spark.biodatageeks.window.optimization", "true")

val queries = Array(
BDGQuery("bdg_seq_count_NA12878","SELECT COUNT(*) FROM reads WHERE sampleId='NA12878'"),
Expand All @@ -48,7 +49,8 @@ val queries = Array(
| )
| GROUP BY targets.contigName,targets.start,targets.end
""".stripMargin),
BDGQuery("bdg_cov_window_fix_length_100_count_NA12878","SELECT COUNT(*) FROM bdg_coverage ('reads','NA12878', 'bases','500')")
BDGQuery("bdg_cov_window_fix_length_100_count_NA12878","SELECT COUNT(*) FROM bdg_coverage ('reads','NA12878', 'blocks','500')"),
BDGQuery("bdg_cov_window_bed_file_targets_count_NA12878", "SELECT COUNT(*) FROM bdg_coverage ('reads', 'NA12878', 'blocks', 'targets')")
)

BDGPerfRunner.run(ss,queries)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ object ResolveTableValuedFunctionsSeq extends Rule[LogicalPlan] {
tvf("table" -> StringType, "sampleId" -> StringType, "result" -> StringType, "target" -> StringType)
{ case Seq(table: Any,sampleId:Any, result:Any, target: Any) =>
BDGCoverage(table.toString,sampleId.toString,result.toString, Some(target.toString))
}),
}
),
"range" -> Map(
/* range(end) */
tvf("end" -> LongType) { case Seq(end: Long) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package org.biodatageeks.preprocessing.coverage



import htsjdk.samtools.{BAMFileReader, CigarOperator, SamReaderFactory, ValidationStringency}
import htsjdk.samtools.{CigarOperator, _}
import org.apache.log4j.Logger
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.biodatageeks.utils.BDGInternalParams

import scala.collection.mutable
import htsjdk.samtools._
import org.apache.spark.broadcast.Broadcast
import org.apache.log4j.Logger


abstract class AbstractCovRecord {
Expand Down Expand Up @@ -171,7 +173,6 @@ object CoverageMethodsMos {
val winLen = windowLength.get
val windowStart = ( (i+posShift) / winLen) * windowLength.get
val windowEnd = windowStart + winLen - 1
// val lastWindowLength = (i + posShift) % winLen // (i + posShift -1) % winLen // -1 current
val lastWindowLength = (i + posShift) % winLen - 1 // HACK to fix last window (omit last element)
sum -= cov // HACK to fix last window (substract last element)

Expand All @@ -181,7 +182,26 @@ object CoverageMethodsMos {
indexShift
}

def eventsToCoverage(sampleId:String, events: RDD[(String,(Array[Short],Int,Int,Int))],
@inline def addLastOptimizedWindow(contig: String, contigMax: Int, windowLength: Option[Int], posShift: Int, i:Int, covSum: Int, cov: Int, ind: Int, result: Array[AbstractCovRecord]) = {
var indexShift = ind
var sum = covSum

if ((i + posShift - 1) == contigMax) { // add last window
val winLen = windowLength.get
val windowStart = ((i + posShift) / winLen) * winLen
val windowEnd = windowStart + winLen
val lastWindowLength = (i + posShift) % winLen - 1// HACK to fix last window (omit last element)
sum -= cov // HACK to fix last window (substract last element)

result(ind) = CovRecordWindow(contig, windowStart, windowEnd, sum / lastWindowLength.toFloat, Some(lastWindowLength))

indexShift += 1
}

indexShift
}

def eventsToCoverage(sampleId:String, events: RDD[(String, (Array[Short], Int, Int, Int, mutable.HashMap[Int, Int]))],
contigMinMap: mutable.HashMap [String,(Int,Int)],
blocksResult:Boolean, allPos: Boolean, windowLength: Option[Int], targetsTable:Option[String]) : RDD[AbstractCovRecord] = {
events
Expand Down Expand Up @@ -209,8 +229,12 @@ object CoverageMethodsMos {
var prevCov = 0
var blockLength = 0

val session = SparkSession.builder().getOrCreate()
val optimizeWindow = session.sqlContext.getConf(BDGInternalParams.OptimizationWindow,"false")

val targetsTab = targetsTable.getOrElse(None)

if (windowLength.isEmpty) { // BLOCKS & BASES (NON-WINDOW) COVERAGE CALCULATIONS
if (windowLength.isEmpty && targetsTable.isEmpty) { // BLOCKS & BASES (NON-WINDOW) COVERAGE CALCULATIONS
ind = addFirstBlock(contig, contigMinMap(contig)._1, posShift, blocksResult, allPos, ind, result) // add first block if necessary (if current positionshift is equal to the earliest read in the contig)

while (i < covArrayLength) {
Expand All @@ -237,7 +261,7 @@ object CoverageMethodsMos {

result.take(ind).iterator

} else { // FIXED - WINDOW COVERAGE CALCULATIONS
} else if (windowLength.isDefined && targetsTable.isEmpty && optimizeWindow == "false") { // FIXED - WINDOW COVERAGE CALCULATIONS
while (i < covArrayLength) {
cov += r._2._1(i)

Expand All @@ -249,6 +273,7 @@ object CoverageMethodsMos {
val winLen = windowLength.get
val windowStart = (((i + posShift) / winLen) - 1) * winLen
val windowEnd = windowStart + winLen - 1

result(ind) = CovRecordWindow(contig, windowStart, windowEnd, covSum / length.toFloat, Some(length))
covSum = 0
ind += 1
Expand All @@ -261,6 +286,197 @@ object CoverageMethodsMos {
ind = addLastWindow(contig, windowLength, posShift, i, covSum, cov, ind, result)

result.take(ind).iterator
} else if (windowLength.isDefined && targetsTable.isEmpty && optimizeWindow == "true"){ // optimized fixed windows
while (i < covArrayLength) {
cov += r._2._1(i)

if ((i + posShift) % windowLength.get == 0 && (i + posShift) > 0) {
var length =
if (i < windowLength.get) i
else windowLength.get

val winLen = windowLength.get
val windowStart = (((i + posShift) / winLen) - 1) * winLen
val windowEnd = windowStart + winLen - 1
if (r._2._5.contains(windowStart)) {
covSum += r._2._5(windowStart)
length = winLen
}

result(ind) = CovRecordWindow(contig, windowStart, windowEnd, covSum / length.toFloat, Some(length))
covSum = 0
ind += 1
}

covSum += cov
i += 1

}

ind = addLastOptimizedWindow(contig, contigMinMap(contig)._2, windowLength, posShift, i, covSum, cov, ind, result)

result.take(ind).iterator

} else if (windowLength.isEmpty && targetsTable.isDefined && optimizeWindow == "true"){

val session = SparkSession.builder().getOrCreate()

val targets = session.sql(s"SELECT * FROM ${targetsTab}")

val targetsCount = targets.count
val targetsRowCollection = targets.rdd.map(x => x).collect()
for (j <- 0 until targetsCount.toInt) {
val row = targetsRowCollection(j)

val targetContig = row(0).asInstanceOf[String]
val targetStart = row(1).asInstanceOf[Int]
val targetEnd = row(2).asInstanceOf[Int]

val targetId = row(3).asInstanceOf[String]

val previousTargetContig =
if (j > 0) targetsRowCollection(j - 1)(0).asInstanceOf[String]
else targetContig

val previousTargetEnd =
if (j > 0 && previousTargetContig == targetContig) targetsRowCollection(j - 1)(2).asInstanceOf[Int]
else 0

if (previousTargetContig != targetContig) {
covSum = 0
cov = 0

}

val windowLen = targetEnd - targetStart + 1
if (targetContig == contig) {

if (targetStart > previousTargetEnd) { //shift over gap
for (k <- previousTargetEnd + 1 until targetStart) {
if (k - posShift + 1 < covArrayLength && k - posShift + 1 >= 0) {
cov += r._2._1(k - posShift + 1)
}
covSum = cov
}
} else {
for (k <- previousTargetEnd until targetStart - 1 by -1) { // shift back -> targets are crossing
if (k - posShift + 1 < covArrayLength && k - posShift + 1 >= 0) {
cov -= r._2._1(k - posShift + 1)
}
}
covSum = cov
}

for (i <- targetStart to targetEnd) {
val iter = i - posShift + 1
if (iter < covArrayLength && iter >= 0) {
cov += r._2._1(iter)

if (i == targetEnd && i > 0) {
var length =
if (iter < windowLen) iter
else windowLen

if (length < windowLen) {
if (r._2._5.contains(targetStart)) {
covSum += r._2._5(targetStart)
length = windowLen
}
}

result(ind) = CovRecordWindow(targetContig, targetStart, targetEnd, covSum / length.toFloat, Some(length))

ind += 1
covSum = 0
}
covSum += cov
}
else if (iter == covArrayLength) {
ind = addLastOptimizedWindow(contig, contigMinMap(contig)._2, Some(windowLen), posShift, iter, covSum, cov, ind, result)
covSum = 0
}
}
}
}

result.take(ind).iterator
} else {

val session = SparkSession.builder().getOrCreate()

val targets = session.sql(s"SELECT * FROM ${targetsTab}")

val targetsCount = targets.count
val targetsRowCollection = targets.rdd.map(x => x).collect()
for (j <- 0 until targetsCount.toInt) {
val row = targetsRowCollection(j)

val targetContig = row(0).asInstanceOf[String]
val targetStart = row(1).asInstanceOf[Int]
val targetEnd = row(2).asInstanceOf[Int]

val targetId = row(3).asInstanceOf[String]

val previousTargetContig =
if (j > 0) targetsRowCollection(j - 1)(0).asInstanceOf[String]
else targetContig

val previousTargetEnd =
if (j > 0 && previousTargetContig == targetContig) targetsRowCollection(j - 1)(2).asInstanceOf[Int]
else 0

if (previousTargetContig != targetContig) {
covSum = 0
cov = 0

}

val windowLen = targetEnd - targetStart + 1
if (targetContig == contig) {

if (targetStart > previousTargetEnd) { //shift over gap
for (k <- previousTargetEnd + 1 until targetStart) {
if (k - posShift + 1 < covArrayLength && k - posShift + 1 >= 0) {
cov += r._2._1(k - posShift + 1)
}
covSum = cov
}
} else {
for (k <- previousTargetEnd until targetStart - 1 by -1) { // shift back -> targets are crossing
if (k - posShift + 1 < covArrayLength && k - posShift + 1 >= 0) {
cov -= r._2._1(k - posShift + 1)
}
}
covSum = cov
}

for (i <- targetStart to targetEnd) {
val iter = i - posShift + 1
if (iter < covArrayLength && iter >= 0) {
cov += r._2._1(iter)

if (i == targetEnd && i > 0) {
val length =
if (iter < windowLen) iter
else windowLen

result(ind) = CovRecordWindow(targetContig, targetStart, targetEnd, covSum / length.toFloat, Some(length))

ind += 1
covSum = 0
}
covSum += cov
}
else if (iter == covArrayLength) {
ind = addLastWindow(contig, Some(windowLen), posShift, iter, covSum, cov, ind, result)
covSum = 0
}
}
}
}

result.take(ind).iterator

}

})
Expand All @@ -274,12 +490,14 @@ object CoverageMethodsMos {
val upd = b.value.upd
val shrink = b.value.shrink
val(contig,(eventsArray,minPos,maxPos,contigLength,maxCigarLength)) = c // to REFACTOR
var beginChange = 0

val updArray = upd.get( (contig,minPos) ) match { // check if there is a value for contigName and minPos in upd, returning array of coverage and cumSum to update current contigRange
case Some((arr,covSum)) => { // array of covs and cumSum
arr match {
case Some(overlapArray) => {
var i = 0
beginChange = eventsArray(0)
eventsArray(i) = (eventsArray(i) + covSum).toShort // add cumSum to zeroth element

while (i < overlapArray.length) {
Expand All @@ -304,8 +522,23 @@ object CoverageMethodsMos {
}
case None => updArray
}
(contig, (shrinkArray, minPos, maxPos, contigLength) )
(contig, (shrinkArray, minPos, maxPos, contigLength, beginChange) )
}
}
}

def upateContigSumRange(b:Broadcast[UpdateSumStruct],reducedEvents: RDD[(String,(Array[Short],Int,Int,Int,Int))]) = {
reducedEvents.map{
c => {
val upd = b.value.upd

val (contig, (eventsArray, minPos, maxPos, contigLength, firstEvent)) = c // to REFACTOR

if(upd.get( (contig,minPos) ).nonEmpty)
(contig, (eventsArray, minPos, maxPos, contigLength, upd((contig, minPos))))
else
(contig, (eventsArray, minPos, maxPos, contigLength, new mutable.HashMap[Int, Int]()))
}
}
}
}
Loading