From 868bd6983ff77263afee71005869206d057c9a39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Agnieszka=20Szmur=C5=82o?= Date: Thu, 13 Aug 2020 08:43:12 +0200 Subject: [PATCH 1/5] quals trimming (#199) * quals trimming * removed hasClip check * default value of StorageLevel --- .../sequila/pileup/PileupMethods.scala | 3 +- .../sequila/pileup/model/AlignmentsRDD.scala | 2 +- .../pileup/model/ContigAggregate.scala | 9 ++-- .../sequila/pileup/model/Quals.scala | 45 +++++++++++++------ .../pileup/model/ReadQualSummary.scala | 7 +-- 5 files changed, 41 insertions(+), 25 deletions(-) diff --git a/src/main/scala/org/biodatageeks/sequila/pileup/PileupMethods.scala b/src/main/scala/org/biodatageeks/sequila/pileup/PileupMethods.scala index 06a23939..b6bc7ff6 100644 --- a/src/main/scala/org/biodatageeks/sequila/pileup/PileupMethods.scala +++ b/src/main/scala/org/biodatageeks/sequila/pileup/PileupMethods.scala @@ -32,7 +32,8 @@ object PileupMethods { val enableInstrumentation = spark.sqlContext.getConf(InternalParams.EnableInstrumentation).toBoolean val alignmentsInstr = if(enableInstrumentation) alignments.instrument() else alignments val storageLevel = - if (spark.sqlContext.getConf(InternalParams.SerializationMode)==StorageLevel.DISK_ONLY.toString()) StorageLevel.DISK_ONLY + if (spark.sqlContext.getConf(InternalParams.SerializationMode, StorageLevel.MEMORY_AND_DISK.toString())==StorageLevel.DISK_ONLY.toString()) + StorageLevel.DISK_ONLY else StorageLevel.MEMORY_AND_DISK //FIXME: Add automatic unpersist diff --git a/src/main/scala/org/biodatageeks/sequila/pileup/model/AlignmentsRDD.scala b/src/main/scala/org/biodatageeks/sequila/pileup/model/AlignmentsRDD.scala index 6febf407..cc35b048 100644 --- a/src/main/scala/org/biodatageeks/sequila/pileup/model/AlignmentsRDD.scala +++ b/src/main/scala/org/biodatageeks/sequila/pileup/model/AlignmentsRDD.scala @@ -91,7 +91,7 @@ case class AlignmentsRDD(rdd: RDD[SAMRecord]) { contigEventAgg.contigLen, util.Arrays.copyOfRange(contigEventAgg.events, 0, maxIndex + 1), //FIXME: https://stackoverflow.com/questions/37969193/why-is-array-slice-so-shockingly-slow contigEventAgg.alts, - contigEventAgg.quals, + contigEventAgg.trimQuals, contigEventAgg.startPosition, contigEventAgg.startPosition + maxIndex, 0, diff --git a/src/main/scala/org/biodatageeks/sequila/pileup/model/ContigAggregate.scala b/src/main/scala/org/biodatageeks/sequila/pileup/model/ContigAggregate.scala index 690177f7..db9c2ba0 100644 --- a/src/main/scala/org/biodatageeks/sequila/pileup/model/ContigAggregate.scala +++ b/src/main/scala/org/biodatageeks/sequila/pileup/model/ContigAggregate.scala @@ -38,6 +38,7 @@ case class ContigAggregate( def getPileupUpdate:PileupUpdate = new PileupUpdate(ArrayBuffer(getTail), ArrayBuffer(getRange)) def getAltPositionsForRange(start: Int, end: Int): SortedSet[Int] = altsKeyCache.range(start,end+1) def addToCache(readQualSummary: ReadQualSummary):Unit = qualityCache.addOrReplace(readQualSummary) + def trimQuals: MultiLociQuals = if(quals != null) quals.trim else null def calculateMaxLength(allPositions: Boolean): Int = { if (! allPositions) @@ -60,8 +61,8 @@ case class ContigAggregate( } } - def updateQuals(pos: Int, alt: Char, quality: Byte, firstUpdate: Boolean = false): Unit = { - quals.updateQuals(pos, alt,quality, firstUpdate) + def updateQuals(pos: Int, alt: Char, quality: Byte, firstUpdate: Boolean = false, updateMax:Boolean = true): Unit = { + quals.updateQuals(pos, alt,quality, firstUpdate, updateMax) } def getTail:Tail ={ @@ -148,7 +149,7 @@ case class ContigAggregate( val reads = correction.qualityCache.getReadsOverlappingPosition(pos) for (read <- reads) { val qual = read.getBaseQualityForPosition(pos.toInt) - adjustedQuals.updateQuals(pos.toInt, QualityConstants.REF_SYMBOL, qual, false) + adjustedQuals.updateQuals(pos.toInt, QualityConstants.REF_SYMBOL, qual, firstUpdate = false, updateMax = false) } } adjustedQuals @@ -173,7 +174,7 @@ case class ContigAggregate( val reads = qualityCache.getReadsOverlappingPositionFullCache(pos) for (read <- reads) { val qual = read.getBaseQualityForPosition(pos.toInt) - qualsInterim.updateQuals(pos.toInt, QualityConstants.REF_SYMBOL, qual, false) + qualsInterim.updateQuals(pos.toInt, QualityConstants.REF_SYMBOL, qual, firstUpdate = false, updateMax = false) } } qualsInterim diff --git a/src/main/scala/org/biodatageeks/sequila/pileup/model/Quals.scala b/src/main/scala/org/biodatageeks/sequila/pileup/model/Quals.scala index 2c296c7c..afaec29b 100644 --- a/src/main/scala/org/biodatageeks/sequila/pileup/model/Quals.scala +++ b/src/main/scala/org/biodatageeks/sequila/pileup/model/Quals.scala @@ -56,17 +56,36 @@ object Quals { map.map({ case (k, v) => k -> v.take(v(QualityConstants.MAX_QUAL_IND) + 1) }) } - def addQualityForAlt(alt: Char, quality: Byte): Unit = { - val altByte = alt.toByte - val qualityIndex = quality - if (!map.contains(altByte)) { - val array = new Array[Short](QualityConstants.QUAL_ARR_SIZE) - array(qualityIndex) = (array(qualityIndex) + 1).toShort - map.update(altByte, array) - } - else - map(altByte)(qualityIndex) = (map(altByte)(qualityIndex) + 1).toShort + def addQualityForAlt(alt: Char, quality: Byte, updateMax:Boolean): Unit = { + val altByte = alt.toByte + val qualityIndex = quality + + if (!map.contains(altByte)) { + val array = new Array[Short](QualityConstants.QUAL_ARR_SIZE) + array(qualityIndex) = 1.toShort // no need for incrementing. first and last time here. + array(QualityConstants.MAX_QUAL_IND) = qualityIndex + map.update(altByte, array) + return + } + + if(updateMax) { + map(altByte)(qualityIndex) = (map(altByte)(qualityIndex) + 1).toShort + if (qualityIndex > map(altByte).last) + map(altByte)(QualityConstants.MAX_QUAL_IND) = qualityIndex + return + } + + if (qualityIndex >= map(altByte).length){ + val array = new Array[Short](QualityConstants.QUAL_ARR_SIZE) + System.arraycopy(map(altByte),0,array, 0, map(altByte).length) + array(qualityIndex) = 1.toShort + map.update(altByte, array) + return } + + map(altByte)(qualityIndex) = (map(altByte)(qualityIndex) + 1).toShort + + } } implicit class MultiLociQualsExtension(val map: Quals.MultiLociQuals) { @@ -75,13 +94,13 @@ object Quals { def trim: MultiLociQuals = map.map({ case (k, v) => k -> v.trim }) @inline - def updateQuals(position: Int, alt: Char, quality: Byte, firstUpdate:Boolean = false): Unit = { + def updateQuals(position: Int, alt: Char, quality: Byte, firstUpdate:Boolean = false, updateMax:Boolean=false): Unit = { if( !firstUpdate || map.contains(position) ) { - map(position).addQualityForAlt(alt, quality) + map(position).addQualityForAlt(alt, quality, updateMax) } else { val singleLocusQualMap = new SingleLocusQuals() - singleLocusQualMap.addQualityForAlt(alt, quality) + singleLocusQualMap.addQualityForAlt(alt, quality, updateMax) map.update(position, singleLocusQualMap) } } diff --git a/src/main/scala/org/biodatageeks/sequila/pileup/model/ReadQualSummary.scala b/src/main/scala/org/biodatageeks/sequila/pileup/model/ReadQualSummary.scala index c2f55e95..63f6ab02 100644 --- a/src/main/scala/org/biodatageeks/sequila/pileup/model/ReadQualSummary.scala +++ b/src/main/scala/org/biodatageeks/sequila/pileup/model/ReadQualSummary.scala @@ -16,12 +16,7 @@ case class ReadQualSummary (start: Int, end: Int, def overlapsPosition(pos: Int): Boolean = !hasDeletionOnPosition(pos) && start <= pos && end >= pos @inline - def relativePosition(absPosition: Int): Int = { - if(!cigarDerivedConf.hasClip) - absPosition - start + inDelEventsOffset(absPosition) - else - absPosition - start + inDelEventsOffset(absPosition) + cigarDerivedConf.leftClipLength - } + def relativePosition(absPosition: Int): Int = absPosition - start + inDelEventsOffset(absPosition) + cigarDerivedConf.leftClipLength @inline private def inDelEventsOffset(pos: Int): Int = { From dcb923ddb760675f21799fd1dfce30931df254af Mon Sep 17 00:00:00 2001 From: agaszmurlo Date: Wed, 12 Aug 2020 17:53:49 +0200 Subject: [PATCH 2/5] dependencies upgrade --- build.sbt | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/build.sbt b/build.sbt index 9d632eb8..d95b33df 100644 --- a/build.sbt +++ b/build.sbt @@ -8,7 +8,7 @@ lazy val sparkVersion = Properties.envOrElse("SPARK_VERSION", DEFAULT_SPARK_2_VE version := s"0.6.0-spark-${sparkVersion}" organization := "org.biodatageeks" -scalaVersion := "2.11.8" +scalaVersion := "2.12.16" val DEFAULT_HADOOP_VERSION = "2.6.5" @@ -20,21 +20,19 @@ dependencyOverrides += "com.google.guava" % "guava" % "15.0" libraryDependencies += "org.seqdoop" % "hadoop-bam" % "7.10.0" libraryDependencies += "org.apache.hadoop" % "hadoop-client" % hadoopVersion -libraryDependencies += "org.apache.spark" % "spark-core_2.11" % sparkVersion -libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % sparkVersion +libraryDependencies += "org.apache.spark" % "spark-core_2.12" % sparkVersion +libraryDependencies += "org.apache.spark" % "spark-sql_2.12" % sparkVersion libraryDependencies += "org.apache.spark" %% "spark-hive" % sparkVersion libraryDependencies += "org.apache.spark" %% "spark-hive-thriftserver" % "2.4.0" -libraryDependencies += "com.holdenkarau" % "spark-testing-base_2.11" % "2.4.0_0.11.0" % "test" excludeAll ExclusionRule(organization = "javax.servlet") excludeAll (ExclusionRule("org.apache.hadoop")) -libraryDependencies += "org.bdgenomics.adam" %% "adam-core-spark2" % "0.25.0" -libraryDependencies += "org.bdgenomics.adam" %% "adam-apis-spark2" % "0.25.0" -libraryDependencies += "org.bdgenomics.adam" %% "adam-cli-spark2" % "0.25.0" -libraryDependencies += "org.scala-lang" % "scala-library" % "2.11.8" +libraryDependencies += "com.holdenkarau" % "spark-testing-base_2.12" % "2.4.5_0.14.0" % "test" excludeAll ExclusionRule(organization = "javax.servlet") excludeAll (ExclusionRule("org.apache.hadoop")) +libraryDependencies += "org.bdgenomics.adam" %% "adam-core-spark2" % "0.32.0" +libraryDependencies += "org.bdgenomics.adam" %% "adam-apis-spark2" % "0.32.0" +libraryDependencies += "org.bdgenomics.adam" %% "adam-cli-spark2" % "0.32.0" +libraryDependencies += "org.scala-lang" % "scala-library" % scalaVersion.toString() libraryDependencies += "org.rogach" %% "scallop" % "3.1.2" -//libraryDependencies += "org.apache.logging.log4j" % "log4j-slf4j-impl" % "2.11.1" -//libraryDependencies += "org.hammerlab.bdg-utils" %% "cli" % -libraryDependencies += "org.bdgenomics.utils" % "utils-metrics-spark2_2.11" % "0.2.16" +libraryDependencies += "org.bdgenomics.utils" % "utils-metrics-spark2_2.12" % "0.2.16" libraryDependencies += "com.github.samtools" % "htsjdk" % "2.19.0" -libraryDependencies += "ch.cern.sparkmeasure" %% "spark-measure" % "0.13" excludeAll (ExclusionRule("org.apache.hadoop")) +libraryDependencies += "ch.cern.sparkmeasure" %% "spark-measure" % "0.16" excludeAll (ExclusionRule("org.apache.hadoop")) libraryDependencies += "org.broadinstitute" % "gatk-native-bindings" % "1.0.0" excludeAll (ExclusionRule("org.apache.hadoop")) libraryDependencies += "org.apache.logging.log4j" % "log4j-core" % "2.11.0" libraryDependencies += "org.apache.logging.log4j" % "log4j-api" % "2.11.0" @@ -47,7 +45,7 @@ libraryDependencies += "org.eclipse.jetty" % "jetty-servlet" % "9.3.24.v20180605 libraryDependencies += "org.apache.derby" % "derbyclient" % "10.14.2.0" libraryDependencies += "org.biodatageeks" % "bdg-performance_2.11" % "0.2-SNAPSHOT" excludeAll (ExclusionRule("org.apache.hadoop")) libraryDependencies += "org.disq-bio" % "disq" % "0.3.3" -libraryDependencies += "io.projectglow" %% "glow" % "0.1.2" +libraryDependencies += "io.projectglow" %% "glow" % "0.5.0" libraryDependencies += "com.github.ichoran" %% "thyme" % "0.1.2-SNAPSHOT" avroSpecificSourceDirectories in Compile += (sourceDirectory in Compile).value / "avro/input" From 637aa0307a4bd52064ea0051546dbe3b0ae5cb15 Mon Sep 17 00:00:00 2001 From: agaszmurlo Date: Wed, 12 Aug 2020 23:03:20 +0200 Subject: [PATCH 3/5] fixing dependencies --- build.sbt | 18 +- .../datasources/BAM/AlignmentRelation.scala | 7 - .../sequila/rangejoins/common/Main.scala | 542 +++++++++--------- 3 files changed, 280 insertions(+), 287 deletions(-) diff --git a/build.sbt b/build.sbt index d95b33df..3ce516df 100644 --- a/build.sbt +++ b/build.sbt @@ -8,7 +8,7 @@ lazy val sparkVersion = Properties.envOrElse("SPARK_VERSION", DEFAULT_SPARK_2_VE version := s"0.6.0-spark-${sparkVersion}" organization := "org.biodatageeks" -scalaVersion := "2.12.16" +scalaVersion := "2.12.12" val DEFAULT_HADOOP_VERSION = "2.6.5" @@ -24,26 +24,26 @@ libraryDependencies += "org.apache.spark" % "spark-core_2.12" % sparkVersion libraryDependencies += "org.apache.spark" % "spark-sql_2.12" % sparkVersion libraryDependencies += "org.apache.spark" %% "spark-hive" % sparkVersion libraryDependencies += "org.apache.spark" %% "spark-hive-thriftserver" % "2.4.0" -libraryDependencies += "com.holdenkarau" % "spark-testing-base_2.12" % "2.4.5_0.14.0" % "test" excludeAll ExclusionRule(organization = "javax.servlet") excludeAll (ExclusionRule("org.apache.hadoop")) -libraryDependencies += "org.bdgenomics.adam" %% "adam-core-spark2" % "0.32.0" -libraryDependencies += "org.bdgenomics.adam" %% "adam-apis-spark2" % "0.32.0" -libraryDependencies += "org.bdgenomics.adam" %% "adam-cli-spark2" % "0.32.0" +libraryDependencies += "com.holdenkarau" % "spark-testing-base_2.12" % "2.4.3_0.14.0" % "test" excludeAll ExclusionRule(organization = "javax.servlet") excludeAll (ExclusionRule("org.apache.hadoop")) +libraryDependencies += "org.bdgenomics.adam" %% "adam-core-spark2" % "0.27.0" +libraryDependencies += "org.bdgenomics.adam" %% "adam-apis-spark2" % "0.27.0" +libraryDependencies += "org.bdgenomics.adam" %% "adam-cli-spark2" % "0.27.0" libraryDependencies += "org.scala-lang" % "scala-library" % scalaVersion.toString() libraryDependencies += "org.rogach" %% "scallop" % "3.1.2" libraryDependencies += "org.bdgenomics.utils" % "utils-metrics-spark2_2.12" % "0.2.16" libraryDependencies += "com.github.samtools" % "htsjdk" % "2.19.0" libraryDependencies += "ch.cern.sparkmeasure" %% "spark-measure" % "0.16" excludeAll (ExclusionRule("org.apache.hadoop")) libraryDependencies += "org.broadinstitute" % "gatk-native-bindings" % "1.0.0" excludeAll (ExclusionRule("org.apache.hadoop")) -libraryDependencies += "org.apache.logging.log4j" % "log4j-core" % "2.11.0" -libraryDependencies += "org.apache.logging.log4j" % "log4j-api" % "2.11.0" +libraryDependencies += "org.apache.logging.log4j" % "log4j-core" % "2.12.1" +libraryDependencies += "org.apache.logging.log4j" % "log4j-api" % "2.12.1" libraryDependencies += "com.intel.gkl" % "gkl" % "0.8.5-1-darwin-SNAPSHOT" libraryDependencies += "com.intel.gkl" % "gkl" % "0.8.5-1-linux-SNAPSHOT" -libraryDependencies += "org.hammerlab.bam" %% "load" % "1.2.0-M1" +//libraryDependencies += "org.hammerlab.bam" %% "load" % "1.2.0-M1" libraryDependencies += "de.ruedigermoeller" % "fst" % "2.57" libraryDependencies += "org.apache.commons" % "commons-lang3" % "3.7" libraryDependencies += "org.eclipse.jetty" % "jetty-servlet" % "9.3.24.v20180605" libraryDependencies += "org.apache.derby" % "derbyclient" % "10.14.2.0" -libraryDependencies += "org.biodatageeks" % "bdg-performance_2.11" % "0.2-SNAPSHOT" excludeAll (ExclusionRule("org.apache.hadoop")) +libraryDependencies += "org.biodatageeks" % "bdg-performance_2.11" % "0.2-SNAPSHOT" excludeAll (ExclusionRule("org.apache.hadoop"), ExclusionRule("org.apache.spark"), ExclusionRule("org.rogach"), ExclusionRule("com.fasterxml.jackson"), ExclusionRule("ch.cern.sparkmeasure")) libraryDependencies += "org.disq-bio" % "disq" % "0.3.3" libraryDependencies += "io.projectglow" %% "glow" % "0.5.0" libraryDependencies += "com.github.ichoran" %% "thyme" % "0.1.2-SNAPSHOT" diff --git a/src/main/scala/org/biodatageeks/sequila/datasources/BAM/AlignmentRelation.scala b/src/main/scala/org/biodatageeks/sequila/datasources/BAM/AlignmentRelation.scala index cb784fdb..7c4cbf2f 100644 --- a/src/main/scala/org/biodatageeks/sequila/datasources/BAM/AlignmentRelation.scala +++ b/src/main/scala/org/biodatageeks/sequila/datasources/BAM/AlignmentRelation.scala @@ -98,13 +98,6 @@ trait BDGAlignFileReaderWriter [T <: BDGAlignInputFormat]{ .newAPIHadoopFile[LongWritable, SAMRecordWritable, T](path) .map(r => r._2.get()) } - case "sparkbam" => { - import spark_bam._, hammerlab.path._ - val bamPath = Path(resolvedPath) - spark - .sparkContext - .loadReads(bamPath) - } case "disq" => { import org.disq_bio.disq.HtsjdkReadsRddStorage diff --git a/src/main/scala/org/biodatageeks/sequila/rangejoins/common/Main.scala b/src/main/scala/org/biodatageeks/sequila/rangejoins/common/Main.scala index d2cdbcb6..5bc9953e 100644 --- a/src/main/scala/org/biodatageeks/sequila/rangejoins/common/Main.scala +++ b/src/main/scala/org/biodatageeks/sequila/rangejoins/common/Main.scala @@ -1,272 +1,272 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +///** +// * Licensed to Big Data Genomics (BDG) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The BDG licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ package org.biodatageeks.sequila.rangejoins.common - -import java.io.PrintWriter -import java.util.Date - -import org.biodatageeks.sequila.rangejoins.NCList.NCListsJoinStrategy -import org.apache.spark.sql.types.{LongType, StructField, StructType} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Row, SparkSession} -import org.bdgenomics.adam.rdd.ADAMContext._ -import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD -import org.bdgenomics.formats.avro.AlignmentRecord -import org.bdgenomics.formats.avro.Feature -import org.bdgenomics.adam.rdd.feature.FeatureRDD -import org.apache.spark.sql.types._ - -import scala.util.Random -import java.io._ -import java.text.SimpleDateFormat - -import org.biodatageeks.sequila.rangejoins.genApp.IntervalTreeJoinStrategy - -object Main { - case class RecordData1(start1: Integer, end1: Integer) extends Serializable - case class RecordData2(start2: Integer, end2: Integer) extends Serializable - def main(args: Array[String]) { - if (!("random".equals(args(0))) && !("bio".equals(args(0)))) { - System.err.println("first argument should be one of two: random; bio") - System.exit(1) - } - - if ("random".equals(args(0))) { - if (args.length < 6) { - System.err.println("four argument required, step1 step2 start stop loops") - System.exit(1) - } else { - randomTest(args) - } - } - - if ("bio".equals(args(0))) { - if (args.length < 8) { - System.err.println("four argument required, step1 step2 start stop loops") - System.exit(1) - } else { - bioTest(args) - } - } - } - - def show_timing[T](proc: => T): T = { - val start=System.nanoTime() - val res = proc // call the code - val end = System.nanoTime() - println("Time elapsed: " + (end-start)/1000 + " microsecs") - res - } - - def log(text:String,pw:PrintWriter): Unit = { - println(text) - pw.write(text+"\n") - } - - def bioTest(args: Array[String]): Unit = { - // PrintWriter - - val fileName = new SimpleDateFormat("'bioTest'yyyyMMddHHmm'.txt'").format(new Date()) - - val pw = new PrintWriter(new File(fileName)) - - val stepFeatures = args(1).toInt - val stepAlignments = args(2).toInt - val start = args(3).toInt - val stop = args(4).toInt - val loops = args(5).toInt - val featuresFilePath = args(6) - val featuresFileName = featuresFilePath.substring(featuresFilePath.lastIndexOf('/')+1) - val alignmentsFilePath = args(7) - val alignmentsFileName = alignmentsFilePath.substring(alignmentsFilePath.lastIndexOf('/')+1) - val spark = SparkSession - .builder() - .appName("ExtraStrategiesGenApp") - .config("spark.master", "local") - .config("spark.sql.crossJoin.enabled", "true") - .getOrCreate() - val sc = spark.sparkContext - val sqlContext = spark.sqlContext - Random.setSeed(4242) - - - var features: FeatureRDD = sc.loadFeatures(featuresFilePath) - var alignments: AlignmentRecordRDD = sc.loadAlignments(alignmentsFilePath) - - var featuresRdd: RDD[Feature] = features.rdd - var alignmentsRdd: RDD[AlignmentRecord] = alignments.rdd - //get only interesting columns - - val fRdd = featuresRdd.map(rec => Row(rec.getStart().toInt, rec.getEnd().toInt)); - val aRdd = alignmentsRdd.map(rec => Row(rec.getStart().toInt, rec.getEnd().toInt)); - //val alignmentsSchema = StructType(Seq(StructField("start2", LongType), StructField("end2", LongType))) - //val featuresSchema = StructType(Seq(StructField("start1", LongType), StructField("end1", LongType))) - - // create DataFrames from RDD's - //val alignmentsDF = sqlContext.createDataFrame(aRdd, alignmentsSchema) - //val featuresDF = sqlContext.createDataFrame(fRdd, featuresSchema) - - //alignmentsDF.createOrReplaceTempView("alignments") - //featuresDF.createOrReplaceTempView("features") - - - - log(featuresFileName+ "\t" + alignmentsFileName,pw) - - for (i <-start to stop) { - var sizeFeatures = i*stepFeatures - var sizeAlignments = i*stepAlignments - /*var rdd1 = sc.parallelize((1 to size).map(x => {val r1=Random.nextInt(1000).toLong; val r2=Random.nextInt(1000).toLong; if (r1<=r2) {Row(r1,r2)} else {Row(r2,r1)}})) - var rdd2 = sc.parallelize((1 to size).map(x => {val r1=Random.nextInt(1000).toLong; val r2=Random.nextInt(1000).toLong; if (r1<=r2) {Row(r1,r2)} else {Row(r2,r1)}}))*/ - //var rdd1 = sc.parallelize(fRdd.takeSample(false,size,4242)) - var rdd1 = sc.parallelize(fRdd.take(sizeFeatures)) - var rdd2 = sc.parallelize(aRdd.take(sizeAlignments)) - //var rdd1 = sc.parallelize(a1Rdd.takeSample(false,size,4242)) - //var rdd2 = sc.parallelize(a2Rdd.takeSample(false,size,4242)) - - val schema1 = StructType(Seq(StructField("start1", IntegerType), StructField("end1", IntegerType))) - val schema2 = StructType(Seq(StructField("start2", IntegerType), StructField("end2", IntegerType))) - var ds1 = sqlContext.createDataFrame(rdd1, schema1) - ds1.createOrReplaceTempView("s1") - var ds2 = sqlContext.createDataFrame(rdd2, schema2) - ds2.createOrReplaceTempView("s2") - - val sqlQuery = "select * from s1 JOIN s2 on (end1>=start2 and start1<=end2)" - - log(sizeFeatures+"-"+sizeAlignments +" Size"+ "\t" + "CartessianJoin" + "\t" +"NCListsJoin"+ "\t" +"IntervalTreeJoin",pw) - for (j <- 1 to loops) { - spark.experimental.extraStrategies = Nil - if (i==1 && j==1) { - //repeat test, because of lazy loading - sqlContext.sql(sqlQuery).count - } - var start = System.nanoTime() - sqlContext.sql(sqlQuery).count - var end = System.nanoTime() - - var cartesianTime = (end - start) / 1000 - - spark.experimental.extraStrategies = new NCListsJoinStrategy(spark) :: Nil - if (i==1 && j==1) { - //repeat test, because of lazy loading - sqlContext.sql(sqlQuery).count - } - start = System.nanoTime() - sqlContext.sql(sqlQuery).count - end = System.nanoTime() - - - - var nclistTime = (end - start) / 1000 - - - spark.experimental.extraStrategies = new IntervalTreeJoinStrategy(spark) :: Nil - if (i==1 && j==1) { - //repeat test, because of lazy loading - sqlContext.sql(sqlQuery).count - } - start = System.nanoTime() - sqlContext.sql(sqlQuery).count - end = System.nanoTime() - var intervalTime = (end - start) / 1000 - log(j + "/" + loops + "\t" + cartesianTime + "\t" + nclistTime + "\t" + intervalTime, pw) - } - } - - pw.close - } - - def randomTest(args: Array[String]): Unit = { - // PrintWriter - import java.io._ - import java.text.SimpleDateFormat - val fileName = new SimpleDateFormat("'randomTest'yyyyMMddHHmm'.txt'").format(new Date()) - val pw = new PrintWriter(new File(fileName)) - - - val stepFeatures = args(1).toInt - val stepAlignments = args(2).toInt - val start = args(3).toInt - val stop = args(4).toInt - val loops = args(5).toInt - val spark = SparkSession - .builder() - .appName("ExtraStrategiesGenApp") - .config("spark.master", "local") - .config("spark.sql.crossJoin.enabled", "true") - .getOrCreate() - val sc = spark.sparkContext - val sqlContext = spark.sqlContext - Random.setSeed(4242) - - for (i <-start to stop) { - var sizeFeatures = i*stepFeatures - var sizeAlignments = i*stepAlignments - var rdd1 = sc.parallelize((1 to sizeFeatures).map(x => {val r1=Random.nextInt(1000); val r2=Random.nextInt(1000); if (r1<=r2) {Row(r1,r2)} else {Row(r2,r1)}})) - var rdd2 = sc.parallelize((1 to sizeAlignments).map(x => {val r1=Random.nextInt(1000); val r2=Random.nextInt(1000); if (r1<=r2) {Row(r1,r2)} else {Row(r2,r1)}})) - val schema1 = StructType(Seq(StructField("start1", IntegerType), StructField("end1", IntegerType))) - val schema2 = StructType(Seq(StructField("start2", IntegerType), StructField("end2", IntegerType))) - var ds1 = sqlContext.createDataFrame(rdd1, schema1) - ds1.createOrReplaceTempView("s1") - var ds2 = sqlContext.createDataFrame(rdd2, schema2) - ds2.createOrReplaceTempView("s2") - - val sqlQuery = "select count(*) from s1 JOIN s2 on (end1>=start2 and start1<=end2)" - - log(sizeFeatures+"-"+sizeAlignments +" Size"+ "\t" + "CartessianJoin" + "\t" +"NCListsJoin"+ "\t" +"IntervalTreeJoin",pw) - for (j <- 1 to loops) { - spark.experimental.extraStrategies = Nil - var start = System.nanoTime() - sqlContext.sql(sqlQuery).count - var end = System.nanoTime() - - if (i==1 && j==1) { - //repeat test, because of lazy loading - start = System.nanoTime() - sqlContext.sql(sqlQuery).count - end = System.nanoTime() - } - var cartesianTime = (end - start) / 1000 - - spark.experimental.extraStrategies = new NCListsJoinStrategy(spark) :: Nil - start = System.nanoTime() - sqlContext.sql(sqlQuery).count - end = System.nanoTime() - - if (i==1 && j==1) { - //repeat test, because of lazy loading - start = System.nanoTime() - sqlContext.sql(sqlQuery).count - end = System.nanoTime() - } - - var nclistTime = (end - start) / 1000 - - - spark.experimental.extraStrategies = new IntervalTreeJoinStrategy(spark) :: Nil - start = System.nanoTime() - sqlContext.sql(sqlQuery).count - end = System.nanoTime() - var intervalTime = (end - start) / 1000 - log(j + "/" + loops + "\t" + cartesianTime + "\t" + nclistTime + "\t" + intervalTime, pw) - } - } - - pw.close - } -} \ No newline at end of file +// +//import java.io.PrintWriter +//import java.util.Date +// +//import org.biodatageeks.sequila.rangejoins.NCList.NCListsJoinStrategy +//import org.apache.spark.sql.types.{LongType, StructField, StructType} +//import org.apache.spark.rdd.RDD +//import org.apache.spark.sql.{Row, SparkSession} +//import org.bdgenomics.adam.rdd.ADAMContext._ +//import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD +//import org.bdgenomics.formats.avro.AlignmentRecord +//import org.bdgenomics.formats.avro.Feature +//import org.bdgenomics.adam.rdd.feature.FeatureRDD +//import org.apache.spark.sql.types._ +// +//import scala.util.Random +//import java.io._ +//import java.text.SimpleDateFormat +// +//import org.biodatageeks.sequila.rangejoins.genApp.IntervalTreeJoinStrategy +// +//object Main { +// case class RecordData1(start1: Integer, end1: Integer) extends Serializable +// case class RecordData2(start2: Integer, end2: Integer) extends Serializable +// def main(args: Array[String]) { +// if (!("random".equals(args(0))) && !("bio".equals(args(0)))) { +// System.err.println("first argument should be one of two: random; bio") +// System.exit(1) +// } +// +// if ("random".equals(args(0))) { +// if (args.length < 6) { +// System.err.println("four argument required, step1 step2 start stop loops") +// System.exit(1) +// } else { +// randomTest(args) +// } +// } +// +// if ("bio".equals(args(0))) { +// if (args.length < 8) { +// System.err.println("four argument required, step1 step2 start stop loops") +// System.exit(1) +// } else { +// bioTest(args) +// } +// } +// } +// +// def show_timing[T](proc: => T): T = { +// val start=System.nanoTime() +// val res = proc // call the code +// val end = System.nanoTime() +// println("Time elapsed: " + (end-start)/1000 + " microsecs") +// res +// } +// +// def log(text:String,pw:PrintWriter): Unit = { +// println(text) +// pw.write(text+"\n") +// } +// +// def bioTest(args: Array[String]): Unit = { +// // PrintWriter +// +// val fileName = new SimpleDateFormat("'bioTest'yyyyMMddHHmm'.txt'").format(new Date()) +// +// val pw = new PrintWriter(new File(fileName)) +// +// val stepFeatures = args(1).toInt +// val stepAlignments = args(2).toInt +// val start = args(3).toInt +// val stop = args(4).toInt +// val loops = args(5).toInt +// val featuresFilePath = args(6) +// val featuresFileName = featuresFilePath.substring(featuresFilePath.lastIndexOf('/')+1) +// val alignmentsFilePath = args(7) +// val alignmentsFileName = alignmentsFilePath.substring(alignmentsFilePath.lastIndexOf('/')+1) +// val spark = SparkSession +// .builder() +// .appName("ExtraStrategiesGenApp") +// .config("spark.master", "local") +// .config("spark.sql.crossJoin.enabled", "true") +// .getOrCreate() +// val sc = spark.sparkContext +// val sqlContext = spark.sqlContext +// Random.setSeed(4242) +// +// +// var features: FeatureRDD = sc.loadFeatures(featuresFilePath) +// var alignments: AlignmentRecordRDD = sc.loadAlignments(alignmentsFilePath) +// +// var featuresRdd: RDD[Feature] = features.rdd +// var alignmentsRdd: RDD[AlignmentRecord] = alignments.rdd +// //get only interesting columns +// +// val fRdd = featuresRdd.map(rec => Row(rec.getStart().toInt, rec.getEnd().toInt)); +// val aRdd = alignmentsRdd.map(rec => Row(rec.getStart().toInt, rec.getEnd().toInt)); +// //val alignmentsSchema = StructType(Seq(StructField("start2", LongType), StructField("end2", LongType))) +// //val featuresSchema = StructType(Seq(StructField("start1", LongType), StructField("end1", LongType))) +// +// // create DataFrames from RDD's +// //val alignmentsDF = sqlContext.createDataFrame(aRdd, alignmentsSchema) +// //val featuresDF = sqlContext.createDataFrame(fRdd, featuresSchema) +// +// //alignmentsDF.createOrReplaceTempView("alignments") +// //featuresDF.createOrReplaceTempView("features") +// +// +// +// log(featuresFileName+ "\t" + alignmentsFileName,pw) +// +// for (i <-start to stop) { +// var sizeFeatures = i*stepFeatures +// var sizeAlignments = i*stepAlignments +// /*var rdd1 = sc.parallelize((1 to size).map(x => {val r1=Random.nextInt(1000).toLong; val r2=Random.nextInt(1000).toLong; if (r1<=r2) {Row(r1,r2)} else {Row(r2,r1)}})) +// var rdd2 = sc.parallelize((1 to size).map(x => {val r1=Random.nextInt(1000).toLong; val r2=Random.nextInt(1000).toLong; if (r1<=r2) {Row(r1,r2)} else {Row(r2,r1)}}))*/ +// //var rdd1 = sc.parallelize(fRdd.takeSample(false,size,4242)) +// var rdd1 = sc.parallelize(fRdd.take(sizeFeatures)) +// var rdd2 = sc.parallelize(aRdd.take(sizeAlignments)) +// //var rdd1 = sc.parallelize(a1Rdd.takeSample(false,size,4242)) +// //var rdd2 = sc.parallelize(a2Rdd.takeSample(false,size,4242)) +// +// val schema1 = StructType(Seq(StructField("start1", IntegerType), StructField("end1", IntegerType))) +// val schema2 = StructType(Seq(StructField("start2", IntegerType), StructField("end2", IntegerType))) +// var ds1 = sqlContext.createDataFrame(rdd1, schema1) +// ds1.createOrReplaceTempView("s1") +// var ds2 = sqlContext.createDataFrame(rdd2, schema2) +// ds2.createOrReplaceTempView("s2") +// +// val sqlQuery = "select * from s1 JOIN s2 on (end1>=start2 and start1<=end2)" +// +// log(sizeFeatures+"-"+sizeAlignments +" Size"+ "\t" + "CartessianJoin" + "\t" +"NCListsJoin"+ "\t" +"IntervalTreeJoin",pw) +// for (j <- 1 to loops) { +// spark.experimental.extraStrategies = Nil +// if (i==1 && j==1) { +// //repeat test, because of lazy loading +// sqlContext.sql(sqlQuery).count +// } +// var start = System.nanoTime() +// sqlContext.sql(sqlQuery).count +// var end = System.nanoTime() +// +// var cartesianTime = (end - start) / 1000 +// +// spark.experimental.extraStrategies = new NCListsJoinStrategy(spark) :: Nil +// if (i==1 && j==1) { +// //repeat test, because of lazy loading +// sqlContext.sql(sqlQuery).count +// } +// start = System.nanoTime() +// sqlContext.sql(sqlQuery).count +// end = System.nanoTime() +// +// +// +// var nclistTime = (end - start) / 1000 +// +// +// spark.experimental.extraStrategies = new IntervalTreeJoinStrategy(spark) :: Nil +// if (i==1 && j==1) { +// //repeat test, because of lazy loading +// sqlContext.sql(sqlQuery).count +// } +// start = System.nanoTime() +// sqlContext.sql(sqlQuery).count +// end = System.nanoTime() +// var intervalTime = (end - start) / 1000 +// log(j + "/" + loops + "\t" + cartesianTime + "\t" + nclistTime + "\t" + intervalTime, pw) +// } +// } +// +// pw.close +// } +// +// def randomTest(args: Array[String]): Unit = { +// // PrintWriter +// import java.io._ +// import java.text.SimpleDateFormat +// val fileName = new SimpleDateFormat("'randomTest'yyyyMMddHHmm'.txt'").format(new Date()) +// val pw = new PrintWriter(new File(fileName)) +// +// +// val stepFeatures = args(1).toInt +// val stepAlignments = args(2).toInt +// val start = args(3).toInt +// val stop = args(4).toInt +// val loops = args(5).toInt +// val spark = SparkSession +// .builder() +// .appName("ExtraStrategiesGenApp") +// .config("spark.master", "local") +// .config("spark.sql.crossJoin.enabled", "true") +// .getOrCreate() +// val sc = spark.sparkContext +// val sqlContext = spark.sqlContext +// Random.setSeed(4242) +// +// for (i <-start to stop) { +// var sizeFeatures = i*stepFeatures +// var sizeAlignments = i*stepAlignments +// var rdd1 = sc.parallelize((1 to sizeFeatures).map(x => {val r1=Random.nextInt(1000); val r2=Random.nextInt(1000); if (r1<=r2) {Row(r1,r2)} else {Row(r2,r1)}})) +// var rdd2 = sc.parallelize((1 to sizeAlignments).map(x => {val r1=Random.nextInt(1000); val r2=Random.nextInt(1000); if (r1<=r2) {Row(r1,r2)} else {Row(r2,r1)}})) +// val schema1 = StructType(Seq(StructField("start1", IntegerType), StructField("end1", IntegerType))) +// val schema2 = StructType(Seq(StructField("start2", IntegerType), StructField("end2", IntegerType))) +// var ds1 = sqlContext.createDataFrame(rdd1, schema1) +// ds1.createOrReplaceTempView("s1") +// var ds2 = sqlContext.createDataFrame(rdd2, schema2) +// ds2.createOrReplaceTempView("s2") +// +// val sqlQuery = "select count(*) from s1 JOIN s2 on (end1>=start2 and start1<=end2)" +// +// log(sizeFeatures+"-"+sizeAlignments +" Size"+ "\t" + "CartessianJoin" + "\t" +"NCListsJoin"+ "\t" +"IntervalTreeJoin",pw) +// for (j <- 1 to loops) { +// spark.experimental.extraStrategies = Nil +// var start = System.nanoTime() +// sqlContext.sql(sqlQuery).count +// var end = System.nanoTime() +// +// if (i==1 && j==1) { +// //repeat test, because of lazy loading +// start = System.nanoTime() +// sqlContext.sql(sqlQuery).count +// end = System.nanoTime() +// } +// var cartesianTime = (end - start) / 1000 +// +// spark.experimental.extraStrategies = new NCListsJoinStrategy(spark) :: Nil +// start = System.nanoTime() +// sqlContext.sql(sqlQuery).count +// end = System.nanoTime() +// +// if (i==1 && j==1) { +// //repeat test, because of lazy loading +// start = System.nanoTime() +// sqlContext.sql(sqlQuery).count +// end = System.nanoTime() +// } +// +// var nclistTime = (end - start) / 1000 +// +// +// spark.experimental.extraStrategies = new IntervalTreeJoinStrategy(spark) :: Nil +// start = System.nanoTime() +// sqlContext.sql(sqlQuery).count +// end = System.nanoTime() +// var intervalTime = (end - start) / 1000 +// log(j + "/" + loops + "\t" + cartesianTime + "\t" + nclistTime + "\t" + intervalTime, pw) +// } +// } +// +// pw.close +// } +//} \ No newline at end of file From dee86bf616e86c7d335c73da5ade20866a9a4540 Mon Sep 17 00:00:00 2001 From: agaszmurlo Date: Wed, 12 Aug 2020 23:51:00 +0200 Subject: [PATCH 4/5] wip: scalac options for inlining --- build.sbt | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/build.sbt b/build.sbt index 3ce516df..a33e887b 100644 --- a/build.sbt +++ b/build.sbt @@ -69,6 +69,11 @@ javaOptions ++= Seq("-Xms512M", "-Xmx8192M", "-XX:+CMSClassUnloadingEnabled" , " //fix for using with hdp warehouse connector javacOptions ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint") +scalacOptions ++=Seq ( + "-opt:l:inline", + "-opt-inline-from:org.biodatageeks.pileup.model.**", + "-opt-warnings:any-inline-failed" +) updateOptions := updateOptions.value.withLatestSnapshots(false) outputStrategy := Some(StdoutOutput) From 9a578d7b7558760ce5da46761a62f5e7bf7bfb4e Mon Sep 17 00:00:00 2001 From: agaszmurlo Date: Thu, 13 Aug 2020 12:18:53 +0200 Subject: [PATCH 5/5] optimizer flags added --- build.sbt | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/build.sbt b/build.sbt index a33e887b..87355439 100644 --- a/build.sbt +++ b/build.sbt @@ -17,7 +17,7 @@ lazy val hadoopVersion = Properties.envOrElse("SPARK_HADOOP_VERSION", DEFAULT_HA dependencyOverrides += "com.google.guava" % "guava" % "15.0" - +libraryDependencies += "org.scala-lang" % "scala-compiler" % "2.12.12" libraryDependencies += "org.seqdoop" % "hadoop-bam" % "7.10.0" libraryDependencies += "org.apache.hadoop" % "hadoop-client" % hadoopVersion libraryDependencies += "org.apache.spark" % "spark-core_2.12" % sparkVersion @@ -28,7 +28,7 @@ libraryDependencies += "com.holdenkarau" % "spark-testing-base_2.12" % "2.4.3_0. libraryDependencies += "org.bdgenomics.adam" %% "adam-core-spark2" % "0.27.0" libraryDependencies += "org.bdgenomics.adam" %% "adam-apis-spark2" % "0.27.0" libraryDependencies += "org.bdgenomics.adam" %% "adam-cli-spark2" % "0.27.0" -libraryDependencies += "org.scala-lang" % "scala-library" % scalaVersion.toString() +libraryDependencies += "org.scala-lang" % "scala-library" % scalaVersion.value libraryDependencies += "org.rogach" %% "scallop" % "3.1.2" libraryDependencies += "org.bdgenomics.utils" % "utils-metrics-spark2_2.12" % "0.2.16" libraryDependencies += "com.github.samtools" % "htsjdk" % "2.19.0" @@ -38,7 +38,6 @@ libraryDependencies += "org.apache.logging.log4j" % "log4j-core" % "2.12.1" libraryDependencies += "org.apache.logging.log4j" % "log4j-api" % "2.12.1" libraryDependencies += "com.intel.gkl" % "gkl" % "0.8.5-1-darwin-SNAPSHOT" libraryDependencies += "com.intel.gkl" % "gkl" % "0.8.5-1-linux-SNAPSHOT" -//libraryDependencies += "org.hammerlab.bam" %% "load" % "1.2.0-M1" libraryDependencies += "de.ruedigermoeller" % "fst" % "2.57" libraryDependencies += "org.apache.commons" % "commons-lang3" % "3.7" libraryDependencies += "org.eclipse.jetty" % "jetty-servlet" % "9.3.24.v20180605" @@ -70,10 +69,17 @@ javaOptions ++= Seq("-Xms512M", "-Xmx8192M", "-XX:+CMSClassUnloadingEnabled" , " //fix for using with hdp warehouse connector javacOptions ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint") scalacOptions ++=Seq ( - "-opt:l:inline", - "-opt-inline-from:org.biodatageeks.pileup.model.**", - "-opt-warnings:any-inline-failed" + "-opt:unreachable-code", + "-opt:simplify-jumps", + "-opt:redundant-casts", + "-opt:box-unbox" ) +//"-opt:simplify-jumps", +//"-opt:allow-skip-core-module-init", +//"-opt-warnings:any-inline-failed" +// "-opt:l:method", +//"-opt:l:inline", +//"-opt-inline-from:org.biodatageeks.sequila.pileup.model.**", updateOptions := updateOptions.value.withLatestSnapshots(false) outputStrategy := Some(StdoutOutput)