-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRUN4_PrepBAM.py
More file actions
216 lines (194 loc) · 8.56 KB
/
RUN4_PrepBAM.py
File metadata and controls
216 lines (194 loc) · 8.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
#!/usr/bin/env python
from __future__ import print_function
import sys
import subprocess
import math
import pdb
# Equivalent to Perl's FindBin...sorta
import os
import itertools
bindir = os.path.abspath(os.path.dirname(__file__))
# Python Choose ConfigParser Based On Version
if sys.version_info[0] < 3:
import ConfigParser
Config = ConfigParser.ConfigParser()
itertools.zip_longest = itertools.izip_longest
else:
import configparser
Config = configparser.ConfigParser()
# Multi processing begins.
import multiprocessing as mp
output = mp.Queue()
# Reading configuration file.
if len(sys.argv) == 1:
sys.exit("usage: py3 {0} <Config file>\n".format(__file__))
Config.read(sys.argv[1])
nThreads = Config.get("OPTIONS", "Threads")
print("Recognizing {0} as max threading...".format(nThreads))
ref = Config.get("PATHS", "reference")
LineNo = dict(Config.items('NUMBER_MULTIPLE'))
print(LineNo)
print("Finding total number of files: {0}".format(len(LineNo)))
# Garbage Collector as of yet unused.
def collectTheGarbage(files):
for filename in files:
command = "rm -rf {0}".format(filename)
print("Running command:\n{0}\n".format(command))
subprocess.call(command, shell=True)
return 1
suffixes = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
def humansize(nbytes):
if nbytes == 0: return '0 B'
i = 0
while nbytes >= 1024 and i < len(suffixes)-1:
nbytes /= 1024.
i += 1
f = ('%.2f' % nbytes).rstrip('0').rstrip('.')
return (f, suffixes[i])
def worker(i):
mult = int(LineNo[i])
if mult > 1:
prefix = Config.get("COMBINE_ACCESSIONS", i)
else:
prefix = Config.get("SINGLE_ACCESSIONS", i)
GarbageCollector = []
base = prefix
inputDir = Config.get("DIRECTORIES", "output_dir")
java = Config.get("PATHS", "java")
picard = Config.get("PATHS", "picard")
minheap = Config.get("OPTIONS", "minheap")
# Checking file size against heap
filename = os.path.join(inputDir, base, base + ".Alignments.bam")
filesize = list(humansize(os.path.getsize(filename)))
filesize[0] = int(math.ceil(float(filesize[0])))
maxheap = Config.get("OPTIONS", "maxheap")
if filesize[1] in suffixes[3:5] and filesize[0] < int(maxheap[:-1]):
maxheap = ''.join([str(filesize[0]+2), 'g'])
callpicard = "{0} -Xms{1} -Xmx{2} -XX:+UseG1GC -XX:+UseStringDeduplication -jar {3}" \
.format(java, minheap, maxheap, picard)
tmp = Config.get("DIRECTORIES", "temp_dir")
prefix = os.path.join(inputDir, base)
gatkdir = os.path.join(prefix, "gatk-results")
if not os.path.exists(gatkdir):
os.makedirs(gatkdir)
if Config.getint("PIPELINE", "SortSam"):
# Sorting bam with PicardTools in Coordinate Order.
finput = os.path.join(prefix, base + ".Alignments.bam")
foutput = os.path.join(gatkdir, base + ".Alignments.PicardSorted.bam")
cmd = "{0} SortSam I={1} O={2} SO=coordinate TMP_DIR={3} CREATE_INDEX=true VALIDATION_STRINGENCY=SILENT" \
.format(callpicard, finput, foutput, tmp)
print("Running commmand:\n{0}".format(cmd))
subprocess.call(cmd, shell=True)
GarbageCollector.append(finput)
if Config.getint("PIPELINE", "MarkDups"):
# Marking duplicates in the bam with PicardTools
finput = "{0}/{1}.Alignments.PicardSorted.bam".format(gatkdir, base)
foutput = "{0}/{1}.PicardSorted.DeDupped.bam".format(gatkdir, base)
metrics = "{0}/{1}.metrics".format(gatkdir, base)
file_handles = int(subprocess.check_output(
"ulimit -n", shell=True)) - 100
max_file_handles = "MAX_FILE_HANDLES_FOR_READ_ENDS_MAP={0}" \
.format(file_handles)
cmd = "{0} MarkDuplicates I={1} O={2} METRICS_FILE={3} TMP_DIR={4} ASSUME_SORTED=true VALIDATION_STRINGENCY=SILENT SORTING_COLLECTION_SIZE_RATIO=.35 {5}" \
.format(callpicard, finput, foutput, metrics, tmp, max_file_handles)
print("Running commmand:\n{0}".format(cmd))
subprocess.call(cmd, shell=True)
GarbageCollector.append(finput)
if Config.getint("PIPELINE", "ReadGroups"):
# Adding or Replacing Read Groups with Picard Tools
finput = "{0}/{1}.PicardSorted.DeDupped.bam".format(gatkdir, base)
# Read Group ID
RGID = "foo"
# Read Group Label
RGLB = "bar"
# Read Group Sequencing Brand
RGPL = "Illumina"
# Read Group PU
RGPU = "blah"
# Read Group Sample (in this case accession name)
RGSM = base
foutput = "{0}/{1}.PicardSorted.DeDupped.RG.bam".format(gatkdir, base)
cmd = "{0} AddOrReplaceReadGroups I={1} O={2} RGID={3} RGLB={4} RGPL={5} RGPU={6} RGSM={7} TMP_DIR={8} CREATE_INDEX=true VALIDATION_STRINGENCY=SILENT" \
.format(callpicard, finput, foutput, RGID, RGLB, RGPL, RGPU, RGSM, tmp)
print("Running commmand:\n{0}".format(cmd))
subprocess.call(cmd, shell=True)
GarbageCollector.append(finput)
# collectTheGarbage(GarbageCollector)
def grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx
args = [iter(iterable)] * n
return itertools.zip_longest(fillvalue=fillvalue, *args)
if __name__ == "__main__":
# Attempting with pool of workers.
# Getting total memory of machine
meminfo = dict((i.split()[0].rstrip(':'), int(i.split()[1]))
for i in open('/proc/meminfo').readlines())
mem_total_kib = meminfo['MemTotal']
mem_total_gib = mem_total_kib*1.0e-6
# Getting number of total processes that can be run at once given memory constraints.
# Trying to ascertain groups with actual sizes of files:
sizeList = []
# pdb.set_trace()
for i in LineNo.keys():
mult = int(LineNo[i])
if mult > 1:
prefix = Config.get("COMBINE_ACCESSIONS", i)
else:
prefix = Config.get("SINGLE_ACCESSIONS", i)
base = prefix
inputDir = Config.get("DIRECTORIES", "output_dir")
filename = os.path.join(inputDir, base, base + ".Alignments.bam")
filesize = list(humansize(os.path.getsize(filename)))
filesize[0] = int(math.ceil(float(filesize[0])))
maxheap = Config.get("OPTIONS", "maxheap")
if filesize[1] in suffixes[3:5] and filesize[0] < int(maxheap[:-1]):
maxheap = filesize[0]+2
else:
maxheap = int(maxheap[:-1])
sizeList.append((i, maxheap))
# Sorting list in ascending order by size of maxheap
sizeList = sorted(sizeList, key = lambda x: x[1])
# print("This is the sizeList: {0} and length: {1}".format(sizeList, len(sizeList)))
# pdb.set_trace()
size = 0
i = 1
j = 0
total = []
while i < len(sizeList):
while i < len(sizeList) and (size < mem_total_gib and size + sizeList[i][1] < mem_total_gib):
# print("This is i: {0} and this is the size: {1}".format(i, size))
size += sizeList[i][1]
i += 1
# if i >= len(sizeList):
# break
total.append([x[0] for x in sizeList[j:i]])
# print("This is i after the nested loop: {0}".format(i))
j = i
size = 0
# print("total apprehended so far: {0}".format(total))
for group in total:
pool = mp.Pool(processes=len(group))
results = [pool.apply_async(func=worker, args=(k, )) for k in group]
for result in results:
result.wait()
print("=" * 100)
print("{0} has finished running.".format(str(group)))
# Finished ascertaining groups with actual sizes.
# memper = int(Config.get("OPTIONS", "maxheap")[:-1])
# at_once = int(mem_total_gib//memper)
# print("This is the total number allowed at once: {0}".format(at_once))
# total = grouper(LineNo.keys(), at_once)
# # pool = mp.Pool(processes=Config.getint("OPTIONS", "processes"))
# pool = mp.Pool(processes=at_once)
# for group in total:
# results = [pool.apply_async(func=worker, args=(i, )) for i in group]
# for result in results:
# z = result.get()
# print("="*100)
# print("{0} has finished running.".format(str(group)))
# results = [pool.apply_async( func=worker,args=(i,) ) for i in LineNo]
# for result in results:
# z = result.get()
print("="*200)
print("{0} has finished running.".format(__file__))