Skip to content

Commit 06fc459

Browse files
committed
working version v0.3
1 parent 4128f58 commit 06fc459

35 files changed

+651
-200
lines changed

halvade/src/be/ugent/intec/halvade/HalvadeOptions.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public class HalvadeOptions {
8181
protected boolean reuseJVM = false;
8282
protected boolean justAlign = false;
8383
protected String exomeBedFile = null;
84-
protected double coverage = 50;
84+
protected double coverage = -1.0;
8585
protected String halvadeBinaries;
8686
protected String bin;
8787
protected boolean combineVcf = true;
@@ -143,7 +143,8 @@ public int GetOptions(String[] args, Configuration hConf) throws IOException, UR
143143

144144
parseDictFile(hConf);
145145
double inputSize = getInputSize(in, hConf);
146-
coverage = DEFAULT_COVERAGE * (inputSize / DEFAULT_COVERAGE_SIZE);
146+
if(coverage == -1.0)
147+
coverage = Math.max(1.0, DEFAULT_COVERAGE * (inputSize / DEFAULT_COVERAGE_SIZE));
147148
Logger.DEBUG("Estimated coverage: " + roundOneDecimal(coverage));
148149
// set a minimum first where the real amount is based on
149150
reduces = (int) (coverage * REDUCE_TASKS_FACTOR);
@@ -185,7 +186,7 @@ protected double getInputSize(String input, Configuration conf) throws URISyntax
185186

186187
private static final String DICT_SUFFIX = ".dict";
187188
private void parseDictFile(Configuration conf) {
188-
be.ugent.intec.halvade.utils.Logger.DEBUG("parsing dictionary file...");
189+
be.ugent.intec.halvade.utils.Logger.DEBUG("parsing dictionary " + ref + DICT_SUFFIX);
189190
try {
190191
FileSystem fs = FileSystem.get(new URI(ref + DICT_SUFFIX), conf);
191192
FSDataInputStream stream = fs.open(new Path(ref + DICT_SUFFIX));

halvade/src/be/ugent/intec/halvade/HalvadeResourceManager.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,18 @@
11
/*
2-
* To change this license header, choose License Headers in Project Properties.
3-
* To change this template file, choose Tools | Templates
4-
* and open the template in the editor.
2+
* Copyright (C) 2014 ddecap
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU General Public License as published by
6+
* the Free Software Foundation, either version 3 of the License, or
7+
* (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
516
*/
617

718
package be.ugent.intec.halvade;
@@ -24,13 +35,13 @@ public class HalvadeResourceManager {
2435
protected static final int ALL = Integer.MAX_VALUE;
2536
protected static final int MEM_AM = (int) (1.5*1024);
2637
protected static final int VCORES_AM = 1;
27-
protected static final int MEM_STAR = (int) (12*1024);
38+
protected static final int MEM_STAR = (int) (31*1024);
2839
protected static final int[][] RESOURCE_REQ = {
29-
//mapmem, mapcpu, redmem, redcpu
40+
//mapmem, redmem
3041
{MEM_STAR, ALL}, // RNA with shared memory pass1
3142
{MEM_STAR, 14*1024}, // RNA with shared memory pass2
3243
{MEM_STAR, 14*1024}, // RNA without shared memory
33-
{14*1024, 14*1024}, // DNA
44+
{16*1024, 16*1024}, // DNA
3445
{4*1024, 4*1024} // combine
3546
};
3647

@@ -65,7 +76,7 @@ public static void setJobResources(HalvadeOptions opt, Configuration conf, int t
6576
conf.set("mapreduce.map.memory.mb", "" + mmem);
6677
conf.set("mapreduce.reduce.java.opts", "-Xmx" + (int)(0.8*rmem) + "m");
6778
conf.set("mapreduce.map.java.opts", "-Xmx" + (int)(0.8*mmem) + "m");
68-
conf.set("mapreduce.job.reduce.slowstart.completedmaps", "" + 1.0);
79+
conf.set("mapreduce.job.reduce.slowstart.completedmaps", "0.99");
6980

7081
HalvadeConf.setMapThreads(conf, opt.mthreads);
7182
HalvadeConf.setReducerThreads(conf, opt.rthreads);

halvade/src/be/ugent/intec/halvade/MapReduceRunner.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020
import fi.tkk.ics.hadoop.bam.SAMRecordWritable;
2121
import fi.tkk.ics.hadoop.bam.VariantContextWritable;
2222
import be.ugent.intec.halvade.hadoop.datatypes.ChromosomeRegion;
23+
import be.ugent.intec.halvade.hadoop.datatypes.GenomeSJ;
2324
import be.ugent.intec.halvade.hadoop.mapreduce.HalvadeTextInputFormat;
2425
import be.ugent.intec.halvade.hadoop.partitioners.*;
25-
import be.ugent.intec.halvade.utils.HalvadeFileUtils;
2626
import org.apache.hadoop.conf.Configuration;
2727
import org.apache.hadoop.conf.Configured;
2828
import org.apache.hadoop.fs.FileStatus;
@@ -133,9 +133,11 @@ protected int runPass1RNAJob(Configuration pass1Conf, String tmpOutDir) throws I
133133
pass1Job.setMapperClass(be.ugent.intec.halvade.hadoop.mapreduce.StarAlignPassXMapper.class);
134134

135135
pass1Job.setInputFormatClass(HalvadeTextInputFormat.class);
136-
pass1Job.setMapOutputKeyClass(LongWritable.class);
136+
pass1Job.setMapOutputKeyClass(GenomeSJ.class);
137137
pass1Job.setMapOutputValueClass(Text.class);
138138

139+
pass1Job.setSortComparatorClass(GenomeSJSortComparator.class);
140+
pass1Job.setGroupingComparatorClass(GenomeSJGroupingComparator.class);
139141
pass1Job.setNumReduceTasks(1);
140142
pass1Job.setReducerClass(be.ugent.intec.halvade.hadoop.mapreduce.RebuildStarGenomeReducer.class);
141143
pass1Job.setOutputKeyClass(LongWritable.class);
@@ -194,8 +196,8 @@ protected int runHalvadeJob(Configuration halvadeConf, String tmpOutDir, int job
194196
halvadeJob.setMapOutputValueClass(SAMRecordWritable.class);
195197
halvadeJob.setInputFormatClass(HalvadeTextInputFormat.class);
196198
halvadeJob.setPartitionerClass(ChrRgPartitioner.class);
197-
halvadeJob.setSortComparatorClass(ChrRgPositionComparator.class);
198-
halvadeJob.setGroupingComparatorClass(ChrRgRegionComparator.class);
199+
halvadeJob.setSortComparatorClass(ChrRgSortComparator.class);
200+
halvadeJob.setGroupingComparatorClass(ChrRgGroupingComparator.class);
199201
halvadeJob.setOutputKeyClass(Text.class);
200202
halvadeJob.setOutputValueClass(VariantContextWritable.class);
201203

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright (C) 2014 ddecap
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU General Public License as published by
6+
* the Free Software Foundation, either version 3 of the License, or
7+
* (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*/
17+
18+
package be.ugent.intec.halvade.hadoop.datatypes;
19+
20+
import java.io.DataInput;
21+
import java.io.DataOutput;
22+
import java.io.IOException;
23+
import net.sf.samtools.SAMSequenceDictionary;
24+
import org.apache.hadoop.io.WritableComparable;
25+
26+
/**
27+
*
28+
* @author ddecap
29+
*/
30+
public class GenomeSJ implements WritableComparable<GenomeSJ> {
31+
protected int type; // 0 = sj string, 1 = overhang length
32+
protected int chromosome;
33+
protected int firstBaseIntron;
34+
protected int lastBaseIntron;
35+
protected int overhang;
36+
37+
public void setOverhang(int overhang) {
38+
this.type = 1;
39+
this.overhang = overhang;
40+
}
41+
42+
public void parseSJString(String sjString, SAMSequenceDictionary dict) {
43+
String columns[] = sjString.split("\t");
44+
this.type = 0;
45+
this.chromosome = dict.getSequenceIndex(columns[0]);
46+
this.firstBaseIntron = Integer.parseInt(columns[1]);
47+
this.lastBaseIntron = Integer.parseInt(columns[2]);
48+
}
49+
50+
public int getType() {
51+
return type;
52+
}
53+
public int getOverhang() {
54+
return overhang;
55+
}
56+
public int getChromosome() {
57+
return chromosome;
58+
}
59+
public int getFirstBaseIntron() {
60+
return firstBaseIntron;
61+
}
62+
public int getLastBaseIntron() {
63+
return lastBaseIntron;
64+
}
65+
66+
public GenomeSJ() {
67+
this.type = 0;
68+
this.chromosome = Integer.MAX_VALUE;
69+
this.firstBaseIntron = -1;
70+
this.lastBaseIntron = -1;
71+
this.overhang = -1;
72+
}
73+
74+
@Override
75+
public void write(DataOutput d) throws IOException {
76+
d.writeInt(type);
77+
d.writeInt(chromosome);
78+
d.writeInt(firstBaseIntron);
79+
d.writeInt(lastBaseIntron);
80+
d.writeInt(overhang);
81+
}
82+
83+
@Override
84+
public void readFields(DataInput di) throws IOException {
85+
type = di.readInt();
86+
chromosome = di.readInt();
87+
firstBaseIntron = di.readInt();
88+
lastBaseIntron = di.readInt();
89+
overhang = di.readInt();
90+
}
91+
92+
@Override
93+
public int compareTo(GenomeSJ o) {
94+
if(chromosome == o.chromosome) {
95+
if(firstBaseIntron == o.firstBaseIntron)
96+
return lastBaseIntron - o.lastBaseIntron;
97+
else
98+
return firstBaseIntron - o.firstBaseIntron;
99+
} else
100+
return chromosome - o.chromosome;
101+
}
102+
103+
}

halvade/src/be/ugent/intec/halvade/hadoop/mapreduce/DnaGATKReducer.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,14 @@ protected void processAlignments(Iterable<SAMRecordWritable> values, Context con
3737
ChromosomeRange r = new ChromosomeRange();
3838
SAMRecordIterator SAMit = new SAMRecordIterator(values.iterator(), header, r);
3939

40-
if(useElPrep)
40+
if(useElPrep && isFirstAttempt)
4141
elPrepPreprocess(context, tools, SAMit, preprocess);
42-
else
43-
PicardPreprocess(context, tools, SAMit, preprocess);
42+
else {
43+
if(!isFirstAttempt) Logger.DEBUG("attempt " + taskId + ", preprocessing with Picard for smaller peak memory");
44+
PicardPreprocess(context, tools, SAMit, preprocess);
45+
}
4446
region = makeRegionFile(context, r, tools, region);
45-
if(region == null) return;
47+
if(region == null) return;
4648

4749
indelRealignment(context, region, gatk, preprocess, tmpFile1);
4850
baseQualityScoreRecalibration(context, region, r, tools, gatk, tmpFile1, tmpFile2);

halvade/src/be/ugent/intec/halvade/hadoop/mapreduce/GATKReducer.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
* @author ddecap
4141
*/
4242
public abstract class GATKReducer extends HalvadeReducer {
43-
43+
protected boolean isFirstAttempt;
4444
protected boolean useBedTools;
4545
protected boolean useUnifiedGenotyper;
4646
protected double sec, scc;
@@ -68,13 +68,14 @@ protected void reduce(ChromosomeRegion key, Iterable<SAMRecordWritable> values,
6868
processAlignments(values, context, tools, gatk);
6969
} catch (URISyntaxException | QualityException | ProcessException ex) {
7070
Logger.EXCEPTION(ex);
71-
throw new InterruptedException();
71+
throw new InterruptedException(ex.getMessage());
7272
}
7373
}
7474

7575
@Override
7676
protected void setup(Context context) throws IOException, InterruptedException {
7777
super.setup(context);
78+
isFirstAttempt = taskId.endsWith("_0");
7879
isRNA = HalvadeConf.getIsRNA(context.getConfiguration());
7980
scc = HalvadeConf.getSCC(context.getConfiguration(), isRNA);
8081
sec = HalvadeConf.getSEC(context.getConfiguration(), isRNA);
@@ -130,17 +131,20 @@ protected void PicardPreprocess(Context context, PreprocessingTools tools, SAMRe
130131

131132
long startTime = System.currentTimeMillis();
132133

134+
int count = 0;
133135
SAMRecord sam;
134136
while(input.hasNext()) {
135137
sam = input.next();
136138
writer.addAlignment(sam);
139+
count++;
137140
}
138141
int reads = input.getCount();
139142
writer.close();
140143

141144
context.getCounter(HalvadeCounters.IN_PREP_READS).increment(reads);
142145
long estimatedTime = System.currentTimeMillis() - startTime;
143146
context.getCounter(HalvadeCounters.TIME_HADOOP_SAMTOBAM).increment(estimatedTime);
147+
Logger.DEBUG("time writing " + count + " records to disk: " + estimatedTime / 1000);
144148

145149
//preprocess steps of iprep
146150
Logger.DEBUG("clean sam");
@@ -212,7 +216,7 @@ protected void baseQualityScoreRecalibration(Context context, String region, Chr
212216
String[] snpslocal = HalvadeFileUtils.downloadSites(context, taskId);
213217
String[] newKnownSites = new String[snpslocal.length];
214218
for(int i = 0 ; i < snpslocal.length; i++) {
215-
if(useBedTools) newKnownSites[i] = tools.filterDBSnps(snpslocal[i], r);
219+
if(useBedTools) newKnownSites[i] = tools.filterDBSnps(ref.replaceAll("fasta", "dict"), snpslocal[i], r, tmpFileBase, threads);
216220
else newKnownSites[i] = snpslocal[i];
217221
if(newKnownSites[i].endsWith(".gz"))
218222
newKnownSites[i] = HalvadeFileUtils.Unzip(newKnownSites[i]);
@@ -282,11 +286,11 @@ protected void splitNTrim(Context context, String region, GATKTools gatk, String
282286
}
283287

284288
// TODO improve annotate/filter
285-
protected void filterVariants(Context context, GATKTools gatk, String input, String output) throws InterruptedException {
289+
protected void filterVariants(Context context, String region, GATKTools gatk, String input, String output) throws InterruptedException {
286290
Logger.DEBUG("run VariantFiltration");
287291
context.setStatus("run VariantFiltration");
288292
context.getCounter(HalvadeCounters.TOOLS_GATK).increment(1);
289-
gatk.runVariantFiltration(input, output, ref, windows, cluster, minFS, maxQD);
293+
gatk.runVariantFiltration(input, output, ref, region, windows, cluster, minFS, maxQD);
290294

291295
HalvadeFileUtils.removeLocalFile(keep, input, context, HalvadeCounters.FOUT_GATK_TMP);
292296
}
@@ -295,7 +299,7 @@ protected void annotateVariants(Context context, String region, GATKTools gatk,
295299
Logger.DEBUG("run VariantAnnotator");
296300
context.setStatus("run VariantAnnotator");
297301
context.getCounter(HalvadeCounters.TOOLS_GATK).increment(1);
298-
gatk.runVariantAnnotator(input, output, ref);
302+
gatk.runVariantAnnotator(input, output, ref, region);
299303

300304
HalvadeFileUtils.removeLocalFile(keep, input, context, HalvadeCounters.FOUT_GATK_TMP);
301305

halvade/src/be/ugent/intec/halvade/hadoop/mapreduce/HalvadeMapper.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,8 @@
1111
import be.ugent.intec.halvade.utils.HalvadeConf;
1212
import java.io.File;
1313
import java.io.IOException;
14-
import java.io.RandomAccessFile;
1514
import java.net.URI;
1615
import java.net.URISyntaxException;
17-
import java.nio.channels.FileChannel;
18-
import java.nio.channels.FileLock;
1916
import java.nio.file.FileSystems;
2017
import java.nio.file.Files;
2118
import org.apache.hadoop.fs.Path;
@@ -34,10 +31,10 @@ protected void cleanup(Context context) throws IOException, InterruptedException
3431
super.cleanup(context);
3532
try {
3633
Logger.DEBUG(readcount + " fastq reads processed");
37-
allTasksHaveStarted = HalvadeConf.allTasksCompleted(context.getConfiguration());
3834
Logger.DEBUG("starting cleanup: closing aligner");
3935
instance.closeAligner();
4036
Logger.DEBUG("finished cleanup");
37+
allTasksHaveStarted = HalvadeConf.allTasksCompleted(context.getConfiguration());
4138
} catch (URISyntaxException ex) {
4239
Logger.EXCEPTION(ex);
4340
}

0 commit comments

Comments
 (0)