-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathextract_seqs_from_FASTA.py
More file actions
executable file
·161 lines (148 loc) · 5.2 KB
/
extract_seqs_from_FASTA.py
File metadata and controls
executable file
·161 lines (148 loc) · 5.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
#!/usr/bin/env python
"""
Created on Tue Mar 17 09:11:56 2021
@author: Nicolás Nahuel Moreyra (niconm89@gmail.com)
"""
#%% Imports
import argparse
import os
from Bio import SeqIO
import random #comment this line if you are not using the random extraction mode and don't want to install the library
import gzip
#%% Function definitions
def check_paths(inputfile,outputfile):
if not os.path.isfile(inputfile):
print("Error: Input file does not exist! " + inputfile)
exit(1)
if '/' in outputfile:
elements_path = outputfile.split('/')
outputfolder = '/'.join(elements_path[:-1])
if not os.path.isdir(outputfolder):
print("Error: Output folder does not exist! " + outputfolder)
exit(1)
#end
def check_mode(arguments):
if arguments.mode == 'byIDs':
if arguments.IDsfile:
if not os.path.isfile(arguments.IDsfile):
print("Error: IDs file does not exist! " + arguments.IDsfile)
exit(1)
else:
print("Extraction mode was set to byIDs but no IDs file was input. Aborting...")
exit(1)
else:
if not arguments.nseqs:
print("Unless you are using byIDs extraction mode, you must define a the number of sequences you want to recover (-n, --nseqs). Aborting...")
exit(1)
#end
def read_IDs(IDsfile):
'receive a file with one ID per line and return a list'
IDs_list = []
with open(IDsfile,'rt') as inputIDs:
lines = inputIDs.readlines()
for line in lines:
id_seq = line.split(" ")[0]
IDs_list.append(id_seq.rstrip())
return IDs_list
#
def check_gzipped(inputfile):
'Return true if file is gzipped. Otherwise return false'
# The first two bytes of a gzip file are: 1f8b
gzipped = False
GZIP_MAGIC_NUMBER = b'\x1f\x8b'
with open(inputfile,'rb') as INPUT:
twobytes = INPUT.read(2)
if twobytes == GZIP_MAGIC_NUMBER:
gzipped = True
return gzipped
#end
def read_fasta(inputfile):
sequences = {}
if check_gzipped(inputfile):
with gzip.open(inputfile, "rt") as FASTA:
count = 0
for record in SeqIO.parse(FASTA, "fasta"):
count +=1
seqname = str(record.id)
seqprot = str(record.seq)
if seqname in sequences:
print("There have repeates sequence IDs in the inputfile. Aborting...")
exit(0)
sequences[seqname] = seqprot
else:
with open(inputfile, "rt") as FASTA:
for record in SeqIO.parse(FASTA, "fasta"):
seqname = str(record.id)
seqprot = str(record.seq)
if seqname in sequences:
print("There have repeates sequence IDs in the inputfile. Aborting...")
exit(0)
sequences[seqname] = seqprot
return sequences
#end
def extract_nseqs(inputfile, outputfile, nseqs, mode, IDsfile):
'main function that manage the differente types of extraction.'
sequences = read_fasta(inputfile)
subseqs = {}
if mode == 'head' or mode == 'tail':
subseqs = extract_head_tail(sequences, nseqs, mode)
else:
if mode == 'random':
subseqs = extract_random(sequences, nseqs)
else:
IDs_list = read_IDs(IDsfile)
subseqs = extract_byIDs(sequences, IDs_list)
save_subseqs(subseqs, outputfile)
#end
def save_subseqs(subseqs, outputfile):
'get a dictionary with ids (key) and sequences (values) and save them in a outputfile.'
with open(outputfile, 'wt') as OUT:
for k,v in subseqs.items():
OUT.write(">" + str(k) + "\n" + str(v) + "\n")
#end
def extract_head_tail(sequences, nseqs, mode):
subsequences = {}
if mode == 'head':
for ID in list(sequences)[0:nseqs]:
subsequences[ID] = sequences[ID]
else:
pos = len(sequences) - nseqs
for ID in list(sequences)[pos:]:
subsequences[ID] = sequences[ID]
return subsequences
#end
def extract_random(sequences, nseqs):
subsequences = {}
lista_IDs = list(sequences.keys())
for n in range(0, nseqs):
IDseq = random.choice(lista_IDs)
lista_IDs.remove(IDseq)
subsequences[IDseq] = sequences[IDseq]
return subsequences
#end
def extract_byIDs(sequences, IDs_list):
subsequences = {}
for ID in IDs_list:
if ID in sequences:
subsequences[ID] = sequences[ID]
else:
print("Sequence ID " + str(ID) + " not found. ID skipped...")
return subsequences
#end
#%%
def main():
parser = argparse.ArgumentParser(
description='''extract_seqs_from_FASTA.py recover sequences from a FASTA file.''',
epilog="""End of the help""")
parser.add_argument('-f', '--infile', type=str, required=True, help='Full path to the FASTA file to convert. It can be gzipped.')
parser.add_argument('-o', '--outfile', type=str, required=True, help='Full path where the new FASTA file will be placed. Output file will not be compressed.')
parser.add_argument('-m', '--mode', type=str, required=True, choices=['head', 'tail', 'random', 'byIDs'], help='Extraction mode. head: first -n sequences; tail: last -n sequences; random: randomly selection of -n sequences; ID_file: file containing a sequence ID per line.')
parser.add_argument('-n', '--nseqs', type=int, required=False, help='Numbers of sequences to recover from the input.')
parser.add_argument('-d', '--IDsfile', type=str, required=False, help='Full path to the file with IDs to extract. Sequence descriptions must be excluded.')
args=parser.parse_args()
check_paths(args.infile, args.outfile) #check if input file and output folder exist
check_mode(args)
extract_nseqs(args.infile, args.outfile, args.nseqs, args.mode, args.IDsfile)
#%% Main program
if __name__ == '__main__':
main()