3131import org .apache .hadoop .mapreduce .lib .output .FileOutputFormat ;
3232import org .apache .hadoop .util .Tool ;
3333import be .ugent .intec .halvade .utils .Logger ;
34- import be .ugent .intec .halvade .utils .MyConf ;
34+ import be .ugent .intec .halvade .utils .HalvadeConf ;
3535import be .ugent .intec .halvade .utils .Timer ;
3636import fi .tkk .ics .hadoop .bam .VCFInputFormat ;
3737import java .net .URI ;
@@ -54,23 +54,14 @@ public int run(String[] strings) throws Exception {
5454 halvadeOpts = new HalvadeOptions ();
5555 int optR = halvadeOpts .GetOptions (strings , halvadeConf );
5656 if (optR != 0 ) return optR ;
57- // initialise MapReduce - copy ref to each node!
57+ // initialise MapReduce - copy ref to each node??
5858
5959
60- // only put files or continue?
61- if (halvadeOpts .justPut )
62- return 0 ;
63-
64-
65- Job halvadeJob = new Job (halvadeConf , "Halvade" );
66- // add to dist cache with job
67- halvadeJob .addCacheArchive (new URI (halvadeOpts .halvadeDir + "bin.tar.gz" ));
68-
60+ Job halvadeJob = Job .getInstance (halvadeConf , "Halvade" );
61+ halvadeJob .addCacheArchive (new URI (halvadeOpts .halvadeBinaries ));
6962
70- halvadeJob .setJarByClass (be .ugent .intec .halvade .hadoop .mapreduce .BWAMemMapper .class );
71- // specify input and output dirs
72- // check if input is a file or directory
7363
64+ halvadeJob .setJarByClass (be .ugent .intec .halvade .hadoop .mapreduce .HalvadeMapper .class );
7465 FileSystem fs = FileSystem .get (new URI (halvadeOpts .in ), halvadeConf );
7566 try {
7667 if (fs .getFileStatus (new Path (halvadeOpts .in )).isDirectory ()) {
@@ -90,32 +81,32 @@ public int run(String[] strings) throws Exception {
9081 }
9182 FileOutputFormat .setOutputPath (halvadeJob , new Path (halvadeOpts .out ));
9283
93- // specify a mapper
94- if (halvadeOpts .aln ) halvadeJob .setMapperClass (be .ugent .intec .halvade .hadoop .mapreduce .BWAAlnMapper .class );
95- else halvadeJob .setMapperClass (be .ugent .intec .halvade .hadoop .mapreduce .BWAMemMapper .class );
84+ if (halvadeOpts .rnaPipeline )
85+ halvadeJob .setMapperClass (be .ugent .intec .halvade .hadoop .mapreduce .StarAlignMapper .class );
86+ else {
87+ if (halvadeOpts .aln ) halvadeJob .setMapperClass (be .ugent .intec .halvade .hadoop .mapreduce .BWAAlnMapper .class );
88+ else halvadeJob .setMapperClass (be .ugent .intec .halvade .hadoop .mapreduce .BWAMemMapper .class ); }
9689 halvadeJob .setMapOutputKeyClass (ChromosomeRegion .class );
9790 halvadeJob .setMapOutputValueClass (SAMRecordWritable .class );
98- halvadeJob .setInputFormatClass (TextInputFormat .class );
99- // halvadeJob.setInputFormatClass(FastqInputFormat.class);
100-
101- // per chromosome && region
91+ halvadeJob .setInputFormatClass (TextInputFormat .class );
10292 halvadeJob .setPartitionerClass (ChrRgPartitioner .class );
10393 halvadeJob .setSortComparatorClass (ChrRgPositionComparator .class );
10494 halvadeJob .setGroupingComparatorClass (ChrRgRegionComparator .class );
10595
106- // # reducers
10796 if (halvadeOpts .justAlign )
10897 halvadeJob .setNumReduceTasks (0 );
10998 else
110- halvadeJob .setNumReduceTasks (halvadeOpts .reducers );
111- // specify a reducer
112- halvadeJob .setReducerClass (be .ugent .intec .halvade .hadoop .mapreduce .GATKReducer .class );
99+ halvadeJob .setNumReduceTasks (halvadeOpts .reducers );
100+ if (halvadeOpts .rnaPipeline )
101+ halvadeJob .setReducerClass (be .ugent .intec .halvade .hadoop .mapreduce .RnaGATKReducer .class );
102+ else
103+ halvadeJob .setReducerClass (be .ugent .intec .halvade .hadoop .mapreduce .DnaGATKReducer .class );
113104 halvadeJob .setOutputKeyClass (Text .class );
114105 halvadeJob .setOutputValueClass (VariantContextWritable .class );
115- // job.setOutputFormatClass(VCFOutputFormat.class);
116106
117107 if (halvadeOpts .dryRun )
118108 return 0 ;
109+
119110 Timer timer = new Timer ();
120111 timer .start ();
121112 ret = halvadeJob .waitForCompletion (true ) ? 0 : 1 ;
@@ -127,9 +118,9 @@ public int run(String[] strings) throws Exception {
127118 Logger .DEBUG ("combining output" );
128119 Configuration combineConf = getConf ();
129120 if (!halvadeOpts .out .endsWith ("/" )) halvadeOpts .out += "/" ;
130- MyConf .setInputDir (combineConf , halvadeOpts .out );
131- MyConf .setOutDir (combineConf , halvadeOpts .out + "combinedVCF/" );
132- MyConf .setReportAllVariant (combineConf , halvadeOpts .reportAll );
121+ HalvadeConf .setInputDir (combineConf , halvadeOpts .out );
122+ HalvadeConf .setOutDir (combineConf , halvadeOpts .out + "combinedVCF/" );
123+ HalvadeConf .setReportAllVariant (combineConf , halvadeOpts .reportAll );
133124 Job combineJob = new Job (combineConf , "HalvadeCombineVCF" );
134125 combineJob .setJarByClass (be .ugent .intec .halvade .hadoop .mapreduce .VCFCombineMapper .class );
135126
0 commit comments