diff --git a/build.sbt b/build.sbt index e706648e..53abb941 100644 --- a/build.sbt +++ b/build.sbt @@ -4,7 +4,7 @@ import scala.util.Properties name := """bdg-sequila""" -version := "0.5.4-spark-2.4.1" +version := "0.5.5-spark-2.4.1-SNAPSHOT" organization := "org.biodatageeks" @@ -103,7 +103,7 @@ resolvers ++= Seq( //fix for hdtsdjk patch in hadoop-bam and disq assemblyShadeRules in assembly := Seq( ShadeRule.rename("htsjdk.samtools.SAMRecordHelper" -> "htsjdk.samtools.SAMRecordHelperDisq").inLibrary("org.disq-bio" % "disq" % "0.3.0"), - ShadeRule.rename("htsjdk.samtools.SAMRecordHelper" -> "htsjdk.samtools.SAMRecordHelperHadoopBAM").inLibrary("org.seqdoop" % "hadoop-bam" % "7.10.0") + ShadeRule.rename("htsjdk.samtools.SAMRecordHelper" -> "htsjdk.samtools.SAMRecordHelperHadoopBAM").inLibrary("org.seqdoop" % "hadoop-bam" % "7.10.0").inProject ) diff --git a/src/main/scala/org/biodatageeks/datasources/BAM/BDGAlignmentRelation.scala b/src/main/scala/org/biodatageeks/datasources/BAM/BDGAlignmentRelation.scala index d78e6e9e..dbfd16c5 100644 --- a/src/main/scala/org/biodatageeks/datasources/BAM/BDGAlignmentRelation.scala +++ b/src/main/scala/org/biodatageeks/datasources/BAM/BDGAlignmentRelation.scala @@ -125,6 +125,7 @@ trait BDGAlignFileReaderWriter [T <: BDGAlignInputFormat]{ logger.info(s"######## Reading ${resolvedPath} or ${path}") val alignReadMethod = spark.sqlContext.getConf(BDGInternalParams.IOReadAlignmentMethod,"hadoopBAM").toLowerCase logger.info(s"######## Using ${alignReadMethod} for reading alignment files.") + logger.info(s"######## Using inputformat: ${c.toString()}") alignReadMethod match { case "hadoopbam" => { diff --git a/src/main/scala/org/biodatageeks/inputformats/BAMBDGInputFormat.java b/src/main/scala/org/biodatageeks/inputformats/BAMBDGInputFormat.java index 10881481..e83aca26 100644 --- a/src/main/scala/org/biodatageeks/inputformats/BAMBDGInputFormat.java +++ b/src/main/scala/org/biodatageeks/inputformats/BAMBDGInputFormat.java @@ -233,7 +233,7 @@ private int addProbabilisticSplits( final SeekableStream sin = WrapSeekable.openPath(path.getFileSystem(cfg), path); - final BAMSplitGuesser guesser = new BAMSplitGuesser(sin, cfg); + final org.seqdoop.hadoop_bam.BAMBDGSplitGuesser guesser = new org.seqdoop.hadoop_bam.BAMBDGSplitGuesser(sin, cfg); FileVirtualSplit previousSplit = null; @@ -247,6 +247,7 @@ private int addProbabilisticSplits( long alignedBeg = guesser.guessNextBAMRecordStart(beg, end); + // As the guesser goes to the next BGZF block before looking for BAM // records, the ending BGZF blocks have to always be traversed fully. // Hence force the length to be 0xffff, the maximum possible. diff --git a/src/main/scala/org/biodatageeks/inputformats/BAMBDGRecord.java b/src/main/scala/org/biodatageeks/inputformats/BAMBDGRecord.java new file mode 100644 index 00000000..29341965 --- /dev/null +++ b/src/main/scala/org/biodatageeks/inputformats/BAMBDGRecord.java @@ -0,0 +1,35 @@ +package htsjdk.samtools; + + +public class BAMBDGRecord extends BAMRecord { + + private byte[] mRestOfBinaryData = null; + protected BAMBDGRecord(final SAMFileHeader header, + final int referenceID, + final int coordinate, + final short readNameLength, + final short mappingQuality, + final int indexingBin, + final int cigarLen, + final int flags, + final int readLen, + final int mateReferenceID, + final int mateCoordinate, + final int insertSize, + final byte[] restOfData) { + super(header,referenceID ,coordinate, readNameLength, mappingQuality, indexingBin, cigarLen, flags, readLen, mateReferenceID,mateCoordinate,insertSize,restOfData ); + + } + + @Override + protected void eagerDecode() { + getReadName(); + getCigar(); + getReadBases(); + getBaseQualities(); + //getBinaryAttributes(); + super.eagerDecode(); + mRestOfBinaryData = null; + } + +} diff --git a/src/main/scala/org/biodatageeks/inputformats/BAMBDGRecordReader.java b/src/main/scala/org/biodatageeks/inputformats/BAMBDGRecordReader.java index 00cdf64a..efec31ae 100644 --- a/src/main/scala/org/biodatageeks/inputformats/BAMBDGRecordReader.java +++ b/src/main/scala/org/biodatageeks/inputformats/BAMBDGRecordReader.java @@ -133,8 +133,10 @@ public static long getKey0(int refIdx, int alignmentStart0) { // initialize() and Hadoop-BAM's own code that relies on // {@link BAMInputFormat} to call initialize() when the reader is // created. Therefore we add this check for the time being. - if(isInitialized) + if(isInitialized) { + if(in != null) in.close(); close(); + } isInitialized = true; reachedEnd = false; diff --git a/src/main/scala/org/biodatageeks/inputformats/BAMBDGSplitGuesser.java b/src/main/scala/org/biodatageeks/inputformats/BAMBDGSplitGuesser.java new file mode 100644 index 00000000..418f9207 --- /dev/null +++ b/src/main/scala/org/biodatageeks/inputformats/BAMBDGSplitGuesser.java @@ -0,0 +1,395 @@ +// Copyright (c) 2011 Aalto University +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to +// deal in the Software without restriction, including without limitation the +// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +// sell copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +// IN THE SOFTWARE. + +// File created: 2011-01-17 15:17:59 + +package org.seqdoop.hadoop_bam; + +import htsjdk.samtools.*; +import htsjdk.samtools.seekablestream.ByteArraySeekableStream; +import java.io.InputStream; +import java.io.IOException; +import java.nio.BufferUnderflowException; +import java.util.Arrays; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.GenericOptionsParser; + +import htsjdk.samtools.seekablestream.SeekableStream; +import htsjdk.samtools.util.BlockCompressedInputStream; +import htsjdk.samtools.util.RuntimeEOFException; +import htsjdk.samtools.util.RuntimeIOException; + +import org.seqdoop.hadoop_bam.util.SAMHeaderReader; +import org.seqdoop.hadoop_bam.util.WrapSeekable; + +/** A class for heuristically finding BAM record positions inside an area of + * a BAM file. + */ +public class BAMBDGSplitGuesser extends BaseSplitGuesser { + private SeekableStream inFile; + private BlockCompressedInputStream bgzf; + private final BAMRecordCodec bamCodec; + private final int referenceSequenceCount; + private final SAMFileHeader header; + + // We want to go through this many BGZF blocks fully, checking that they + // contain valid BAM records, when guessing a BAM record position. + private final static byte BLOCKS_NEEDED_FOR_GUESS = 3; + + // Since the max size of a BGZF block is 0xffff (64K), and we might be just + // one byte off from the start of the previous one, we need 0xfffe bytes for + // the start, and then 0xffff times the number of blocks we want to go + // through. + private final static int MAX_BYTES_READ = + BLOCKS_NEEDED_FOR_GUESS * 0xffff + 0xfffe; + + private final static int SHORTEST_POSSIBLE_BAM_RECORD = 4*9 + 1 + 1 + 1; + + /** The stream must point to a valid BAM file, because the header is read + * from it. + */ + public BAMBDGSplitGuesser( + SeekableStream ss, Configuration conf) + throws IOException + { + this(ss, ss, conf); + + // Secondary check that the header points to a BAM file: Picard can get + // things wrong due to its autodetection. + ss.seek(0); + if (ss.read(buf.array(), 0, 4) != 4 || buf.getInt(0) != BGZF_MAGIC) + throw new SAMFormatException("Does not seem like a BAM file"); + } + + public BAMBDGSplitGuesser( + SeekableStream ss, InputStream headerStream, Configuration conf) + throws IOException + { + inFile = ss; + + header = SAMHeaderReader.readSAMHeaderFrom(headerStream, conf); + referenceSequenceCount = header.getSequenceDictionary().size(); + + bamCodec = new BAMRecordCodec(null, new LazyBAMRecordFactory()); + } + + /** Finds a virtual BAM record position in the physical position range + * [beg,end). Returns end if no BAM record was found. + */ + public long guessNextBAMRecordStart(long beg, long end) + throws IOException + { + // Use a reader to skip through the headers at the beginning of a BAM file, since + // the headers may exceed MAX_BYTES_READ in length. Don't close the reader + // otherwise it will close the underlying stream, which we continue to read from + // on subsequent calls to this method. + if (beg == 0) { + this.inFile.seek(beg); + SamReader open = SamReaderFactory.makeDefault().setUseAsyncIo(false) + .open(SamInputResource.of(inFile)); + SAMFileSpan span = open.indexing().getFilePointerSpanningReads(); + if (span instanceof BAMFileSpan) { + return ((BAMFileSpan) span).getFirstOffset(); + } + } + + // Buffer what we need to go through. + + byte[] arr = new byte[MAX_BYTES_READ]; + + this.inFile.seek(beg); + int totalRead = 0; + for (int left = Math.min((int)(end - beg), arr.length); left > 0;) { + final int r = inFile.read(arr, totalRead, left); + if (r < 0) + break; + totalRead += r; + left -= r; + } + arr = Arrays.copyOf(arr, totalRead); + + this.in = new ByteArraySeekableStream(arr); + + this.bgzf = new BlockCompressedInputStream(this.in); + this.bgzf.setCheckCrcs(true); + + this.bamCodec.setInputStream(bgzf); + + final int firstBGZFEnd = Math.min((int)(end - beg), 0xffff); + + // cp: Compressed Position, indexes the entire BGZF input. + for (int cp = 0;; ++cp) { + final PosSize psz = guessNextBGZFPos(cp, firstBGZFEnd); + if (psz == null) + return end; + + final int cp0 = cp = psz.pos; + final long cp0Virt = (long)cp0 << 16; + try { + bgzf.seek(cp0Virt); + + // This has to catch Throwable, because it's possible to get an + // OutOfMemoryError due to an overly large size. + } catch (Throwable e) { + // Guessed BGZF position incorrectly: try the next guess. + continue; + } + + // up: Uncompressed Position, indexes the data inside the BGZF block. + for (int up = 0;; ++up) { + final int up0 = up = guessNextBAMPos(cp0Virt, up, psz.size); + + if (up0 < 0) { + // No BAM records found in the BGZF block: try the next BGZF + // block. + break; + } + + // Verify that we can actually decode BLOCKS_NEEDED_FOR_GUESS worth + // of records starting at (cp0,up0). + bgzf.seek(cp0Virt | up0); + boolean decodedAny = false; + try { + byte b = 0; + int prevCP = cp0; + while (b < BLOCKS_NEEDED_FOR_GUESS) + { + SAMRecord record = bamCodec.decode(); + if (record == null) { + break; + } + record.setHeaderStrict(header); + + + SAMRecordHelper.eagerDecode(record); // force decoding of fields + decodedAny = true; + + final int cp2 = (int)(bgzf.getFilePointer() >>> 16); + if (cp2 != prevCP) { + // The compressed position changed so we must be in a new + // block. + assert cp2 > prevCP; + prevCP = cp2; + ++b; + } + } + + // Running out of records to verify is fine as long as we + // verified at least something. It should only happen if we + // couldn't fill the array. + if (b < BLOCKS_NEEDED_FOR_GUESS) { + assert arr.length < MAX_BYTES_READ; + if (!decodedAny) + continue; + } + } + catch (SAMFormatException e) { continue; } + catch (OutOfMemoryError e) { continue; } + catch (IllegalArgumentException e) { continue; } + catch (IndexOutOfBoundsException e) { continue; } + catch (RuntimeIOException e) { continue; } + catch (BufferUnderflowException e) {continue;} + + // EOF can happen legitimately if the [beg,end) range is too + // small to accommodate BLOCKS_NEEDED_FOR_GUESS and we get cut + // off in the middle of a record. In that case, our stream + // should have hit EOF as well. If we've then verified at least + // something, go ahead with it and hope for the best. + catch (FileTruncatedException e) { + if (!decodedAny && this.in.eof()) + continue; + } + catch (RuntimeEOFException e) { + if (!decodedAny && this.in.eof()) + continue; + } + return beg+cp0 << 16 | up0; + } + } + } + + private int guessNextBAMPos(long cpVirt, int up, int cSize) { + // What we're actually searching for is what's at offset [4], not [0]. So + // skip ahead by 4, thus ensuring that whenever we find a valid [0] it's + // at position up or greater. + up += 4; + + try { + while (up + SHORTEST_POSSIBLE_BAM_RECORD - 4 < cSize) { + bgzf.seek(cpVirt | up); + IOUtils.readFully(bgzf, buf.array(), 0, 8); + + // If the first two checks fail we have what looks like a valid + // reference sequence ID. Assume we're at offset [4] or [24], i.e. + // the ID of either this read or its mate, respectively. So check + // the next integer ([8] or [28]) to make sure it's a 0-based + // leftmost coordinate. + final int id = buf.getInt(0); + final int pos = buf.getInt(4); + if (id < -1 || id > referenceSequenceCount || pos < -1) { + ++up; + continue; + } + + // Okay, we could be at [4] or [24]. Assuming we're at [4], check + // that [24] is valid. Assume [4] because we should hit it first: + // the only time we expect to hit [24] is at the beginning of the + // split, as part of the first read we should skip. + + bgzf.seek(cpVirt | up+20); + IOUtils.readFully(bgzf, buf.array(), 0, 8); + + final int nid = buf.getInt(0); + final int npos = buf.getInt(4); + if (nid < -1 || nid > referenceSequenceCount || npos < -1) { + ++up; + continue; + } + + // So far so good: [4] and [24] seem okay. Now do something a bit + // more involved: make sure that [36 + [12]&0xff - 1] == 0: that + // is, the name of the read should be null terminated. + + // Move up to 0 just to make it less likely that we get confused + // with offsets. Remember where we should continue from if we + // reject this up. + final int nextUP = up + 1; + up -= 4; + + bgzf.seek(cpVirt | up+12); + IOUtils.readFully(bgzf, buf.array(), 0, 4); + + final int nameLength = buf.getInt(0) & 0xff; + if (nameLength < 1) { + // Names are null-terminated so length must be at least one + up = nextUP; + continue; + } + + final int nullTerminator = up + 36 + nameLength-1; + + if (nullTerminator >= cSize) { + // This BAM record can't fit here. But maybe there's another in + // the remaining space, so try again. + up = nextUP; + continue; + } + + bgzf.seek(cpVirt | nullTerminator); + IOUtils.readFully(bgzf, buf.array(), 0, 1); + + if (buf.get(0) != 0) { + up = nextUP; + continue; + } + + // All of [4], [24], and [36 + [12]&0xff] look good. If [0] is also + // sensible, that's good enough for us. "Sensible" to us means the + // following: + // + // [0] >= 4*([16]&0xffff) + [20] + ([20]+1)/2 + 4*8 + ([12]&0xff) + + // Note that [0] is "length of the _remainder_ of the alignment + // record", which is why this uses 4*8 instead of 4*9. + int zeroMin = 4*8 + nameLength; + + bgzf.seek(cpVirt | up+16); + IOUtils.readFully(bgzf, buf.array(), 0, 8); + + zeroMin += (buf.getInt(0) & 0xffff) * 4; + zeroMin += buf.getInt(4) + (buf.getInt(4)+1)/2; + + bgzf.seek(cpVirt | up); + IOUtils.readFully(bgzf, buf.array(), 0, 4); + + if (buf.getInt(0) < zeroMin) { + up = nextUP; + continue; + } + return up; + } + } catch (IOException e) {} + return -1; + } + + public static void main(String[] args) throws IOException { + final GenericOptionsParser parser; + try { + parser = new GenericOptionsParser(args); + + // This should be IOException but Hadoop 0.20.2 doesn't throw it... + } catch (Exception e) { + System.err.printf("Error in Hadoop arguments: %s\n", e.getMessage()); + System.exit(1); + + // Hooray for javac + return; + } + + args = parser.getRemainingArgs(); + final Configuration conf = parser.getConfiguration(); + + long beg = 0; + + if (args.length < 2 || args.length > 3) { + System.err.println( + "Usage: BAMSplitGuesser path-or-uri header-path-or-uri [beg]"); + System.exit(2); + } + + try { + if (args.length > 2) beg = Long.decode(args[2]); + } catch (NumberFormatException e) { + System.err.println("Invalid beg offset."); + if (e.getMessage() != null) + System.err.println(e.getMessage()); + System.exit(2); + } + + SeekableStream ss = WrapSeekable.openPath(conf, new Path(args[0])); + SeekableStream hs = WrapSeekable.openPath(conf, new Path(args[1])); + + final long end = beg + MAX_BYTES_READ; + + System.out.printf( + "Will look for a BGZF block within: [%1$#x,%2$#x) = [%1$d,%2$d)\n"+ + "Will then verify BAM data within: [%1$#x,%3$#x) = [%1$d,%3$d)\n", + beg, beg + 0xffff, end); + + final long g = + new BAMSplitGuesser(ss, hs, conf).guessNextBAMRecordStart(beg, end); + + ss.close(); + + if (g == end) { + System.out.println( + "Didn't find any acceptable BAM record in any BGZF block."); + System.exit(1); + } + + System.out.printf( + "Accepted BGZF block at offset %1$#x (%1$d).\n"+ + "Accepted BAM record at offset %2$#x (%2$d) therein.\n", + g >> 16, g & 0xffff); + } +} diff --git a/src/main/scala/org/biodatageeks/preprocessing/coverage/CoverageMethods.scala b/src/main/scala/org/biodatageeks/preprocessing/coverage/CoverageMethods.scala index 61d6bcc1..aa5b372e 100644 --- a/src/main/scala/org/biodatageeks/preprocessing/coverage/CoverageMethods.scala +++ b/src/main/scala/org/biodatageeks/preprocessing/coverage/CoverageMethods.scala @@ -283,8 +283,14 @@ object CoverageMethodsMos { eventsArray(i) = (eventsArray(i) + covSum).toShort // add cumSum to zeroth element while (i < overlapArray.length) { - eventsArray(i) = (eventsArray(i) + overlapArray(i)).toShort - i += 1 + try { + eventsArray(i) = (eventsArray(i) + overlapArray(i)).toShort + i += 1 + } + catch { + case e: ArrayIndexOutOfBoundsException => logger.error(s" Overlap array length: ${overlapArray.length}, events array length: ${eventsArray.length}") + } + } eventsArray } diff --git a/src/test/resources/nanopore_guppy_slice.bam b/src/test/resources/nanopore_guppy_slice.bam new file mode 100644 index 00000000..13592f9c Binary files /dev/null and b/src/test/resources/nanopore_guppy_slice.bam differ diff --git a/src/test/scala/pl/edu/pw/ii/biodatageeks/tests/LongReadsTestSuite.scala b/src/test/scala/pl/edu/pw/ii/biodatageeks/tests/LongReadsTestSuite.scala new file mode 100644 index 00000000..8973b434 --- /dev/null +++ b/src/test/scala/pl/edu/pw/ii/biodatageeks/tests/LongReadsTestSuite.scala @@ -0,0 +1,41 @@ +package pl.edu.pw.ii.biodatageeks.tests + +import com.holdenkarau.spark.testing.{DataFrameSuiteBase, SharedSparkContext} +import org.apache.spark.sql.{SequilaSession, SparkSession} +import org.bdgenomics.utils.instrumentation.Metrics +import org.biodatageeks.utils.{BDGInternalParams, SequilaRegister} +import org.scalatest.{BeforeAndAfter, FunSuite} + +class LongReadsTestSuite extends FunSuite with DataFrameSuiteBase with BeforeAndAfter with SharedSparkContext { + + val bamPath = getClass.getResource("/nanopore_guppy_slice.bam").getPath + //val bamPath = "/Users/marek/data/guppy.chr21.bam" + val splitSize = "1000000" + val tableNameBAM = "reads" + + before { + + System.setSecurityManager(null) + spark.sql(s"DROP TABLE IF EXISTS ${tableNameBAM}") + spark.sql( + s""" + |CREATE TABLE ${tableNameBAM} + |USING org.biodatageeks.datasources.BAM.BAMDataSource + |OPTIONS(path "${bamPath}") + | + """.stripMargin) + + } + test("BAM - Nanopore with guppy basecaller") { + + spark.sqlContext.setConf(BDGInternalParams.InputSplitSize, splitSize) + + val session: SparkSession = SequilaSession(spark) + SequilaRegister.register(session) + session + .sparkContext + .setLogLevel("INFO") + val bdg = session.sql(s"SELECT * FROM ${tableNameBAM}") + assert(bdg.count() === 150) + } +}