66
77package be .ugent .intec .halvade .hadoop .mapreduce ;
88
9+ import be .ugent .intec .halvade .hadoop .datatypes .ChromosomeRegion ;
910import be .ugent .intec .halvade .utils .HalvadeConf ;
1011import htsjdk .samtools .SAMFileHeader ;
1112import htsjdk .samtools .SAMReadGroupRecord ;
2526 *
2627 * @author ddecap
2728 */
28- public class BamMergeReducer extends Reducer <LongWritable , SAMRecordWritable , LongWritable , SAMRecordWritable > {
29+ public class BamMergeReducer extends Reducer <ChromosomeRegion , SAMRecordWritable , LongWritable , SAMRecordWritable > {
2930
3031 protected SAMFileHeader header ;
3132 protected SAMSequenceDictionary dict ;
32- KeyIgnoringBAMOutputFormat outpFormat ;
33+ protected KeyIgnoringBAMOutputFormat outpFormat ;
3334 protected String RGID = "GROUP1" ;
3435 protected String RGLB = "LIB1" ;
3536 protected String RGPL = "ILLUMINA" ;
3637 protected String RGPU = "UNIT1" ;
3738 protected String RGSM = "SAMPLE1" ;
3839 protected SAMReadGroupRecord bamrg ;
3940 protected boolean inputIsBam = false ;
40- RecordWriter <LongWritable ,SAMRecordWritable > recordWriter ;
41+ protected RecordWriter <LongWritable ,SAMRecordWritable > recordWriter ;
42+ protected LongWritable outKey ;
4143 boolean reportBest = false ;
4244
4345 @ Override
@@ -47,17 +49,18 @@ public void run(Context context) throws IOException, InterruptedException {
4749 }
4850
4951 @ Override
50- protected void reduce (LongWritable key , Iterable <SAMRecordWritable > values , Context context ) throws IOException , InterruptedException {
52+ protected void reduce (ChromosomeRegion key , Iterable <SAMRecordWritable > values , Context context ) throws IOException , InterruptedException {
5153 Iterator <SAMRecordWritable > it = values .iterator ();
5254 while (it .hasNext ())
53- recordWriter .write (key , it .next ());
55+ recordWriter .write (outKey , it .next ());
5456 }
5557
5658 @ Override
5759 protected void setup (Context context ) throws IOException , InterruptedException {
5860 outpFormat = new KeyIgnoringBAMOutputFormat ();
5961 String output = HalvadeConf .getOutDir (context .getConfiguration ());
6062 inputIsBam = HalvadeConf .inputIsBam (context .getConfiguration ());
63+ dict = HalvadeConf .getSequenceDictionary (context .getConfiguration ());
6164 if (inputIsBam ) {
6265 header = SAMHeaderReader .readSAMHeaderFrom (new Path (HalvadeConf .getHeaderFile (context .getConfiguration ())), context .getConfiguration ());
6366 } else {
@@ -74,6 +77,8 @@ protected void setup(Context context) throws IOException, InterruptedException {
7477
7578 outpFormat .setSAMHeader (header );
7679 recordWriter = outpFormat .getRecordWriter (context , new Path (output + "mergedBam.bam" ));
80+ outKey = new LongWritable ();
81+ outKey .set (0 );
7782 }
7883
7984 protected void getReadGroupData (Configuration conf ) {
0 commit comments