Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pixi.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pyarrow = "*"
galah = ">=0.4.0"
sqlparse = "*" # Required indirectly (e.g. taxtastic)
zenodo_backpack = ">=0.4.0"
zstandard = "*"
# Optional (commented out)
# python-annoy = "*"
# nmslib = "*"
Expand Down
6 changes: 5 additions & 1 deletion singlem/biolib_lite/prodigal_biolib.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,17 @@ def _producer(self, genome_file):
else:
proc_str = 'single' # estimate parameters from data

# If this is a gzipped genome, re-write the uncompressed genome
# If this is a gzipped or zst compressed genome, re-write the uncompressed genome
# file to disk
prodigal_input = genome_file
if genome_file.endswith('.gz'):
prodigal_input = os.path.join(
tmp_dir, os.path.basename(genome_file[0:-3]) + '.fna')
write_fasta(seqs, prodigal_input)
elif genome_file.endswith('.zst'):
prodigal_input = os.path.join(
tmp_dir, os.path.basename(genome_file[0:-4]) + '.fna')
write_fasta(seqs, prodigal_input)

# there may be ^M character in the input file,
# the following code is similar to dos2unix command to remove
Expand Down
8 changes: 7 additions & 1 deletion singlem/biolib_lite/seq_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import os
import sys
import traceback
import zstandard

from .exceptions import InputFileError

Expand Down Expand Up @@ -55,6 +56,8 @@ def read_fasta(fasta_file, keep_annotation=False):

if fasta_file.endswith('.gz'):
file_f, file_mode = gzip.open, 'rt'
elif fasta_file.endswith('.zst'):
file_f, file_mode = zstandard.open, 'rt'
else:
file_f, file_mode = open, 'r'

Expand Down Expand Up @@ -126,6 +129,9 @@ def read_fasta_seq(fasta_file, keep_annotation=False):
if fasta_file.endswith('.gz'):
open_file = gzip.open
mode = 'rb'
elif fasta_file.endswith('.zst'):
open_file = zstandard.open
mode = 'rt'

seq_id = None
annotation = None
Expand Down Expand Up @@ -201,7 +207,7 @@ def read_seq(seq_file, keep_annotation=False):
and the annotation if keep_annotation is True.
"""

if seq_file.endswith(('.fq.gz', '.fastq.gz', '.fq', '.fq.gz')):
if seq_file.endswith(('.fq.gz', '.fastq.gz', '.fq', '.fq.zst', '.fastq.zst')):
raise Exception("Cannot read FASTQ files.")
# for rtn in read_fastq_seq(seq_file):
# yield rtn
Expand Down
7 changes: 6 additions & 1 deletion singlem/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import gzip
import tempfile
import json
import zstandard

from bird_tool_utils import *
from bird_tool_utils.people import *
Expand Down Expand Up @@ -951,7 +952,11 @@ def add_prokaryotic_fraction_parser(name, description, deprecated=False):
with open(args.input_gzip_archive_otu_table_list) as f:
for arc in f.readlines():
try:
otus.add_archive_otu_table(gzip.open(arc.strip()))
arc_path = arc.strip()
if arc_path.endswith('.zst'):
otus.add_archive_otu_table(zstandard.open(arc_path))
else:
otus.add_archive_otu_table(gzip.open(arc_path))
except json.decoder.JSONDecodeError:
logging.warning("Failed to parse JSON from archive OTU table {}, skipping".format(arc))
otus.set_target_taxonomy_by_string(args.taxonomy)
Expand Down
11 changes: 9 additions & 2 deletions singlem/orf_length_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,15 @@ def check_sequence_file_contains_an_orf(path, min_orf_length):
streaming. Only checks the first 1000 lines of the sequence file.'''

# Cannot use extern here because the SIGPIPE signals generated
result = subprocess.check_output(['bash','-c',"cat '%s' |zcat --stdout -f |head -n1000 |orfm -m %i |head -n2" %(
path, min_orf_length
# Determine decompression command based on file extension
if path.endswith('.zst'):
decompress_cmd = "zstdcat '%s'" % path
else:
# zcat with -f handles both plain and gzipped files
decompress_cmd = "zcat --stdout -f '%s'" % path

result = subprocess.check_output(['bash','-c',"%s |head -n1000 |orfm -m %i |head -n2" %(
decompress_cmd, min_orf_length
)])
if len(result) == 0:
return False
Expand Down
12 changes: 12 additions & 0 deletions singlem/otu_table_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from collections import OrderedDict
import gzip
import json
import zstandard

from .archive_otu_table import ArchiveOtuTable
from .otu_table import OtuTable
Expand Down Expand Up @@ -193,6 +194,7 @@ def __init__(self):
self._otu_table_file_paths = []
self._archive_table_file_paths = []
self._gzip_archive_table_file_paths = []
self._zst_archive_table_file_paths = []
self._archive_table_objects = []
self.min_archive_otu_table_version = None

Expand Down Expand Up @@ -222,6 +224,9 @@ def add_archive_otu_table_file(self, file_path):
def add_gzip_archive_otu_table_file(self, file_path):
self._gzip_archive_table_file_paths.append(file_path)

def add_zst_archive_otu_table_file(self, file_path):
self._zst_archive_table_file_paths.append(file_path)

def add_archive_otu_table_object(self, archive_table):
'''Not technically streaming, but easier to put this here for pipe
instead of implementing each_sample_otus() for non-streaming OTU
Expand Down Expand Up @@ -253,6 +258,13 @@ def __iter__(self):
yield otu
except json.decoder.JSONDecodeError:
logging.error(f"JSON parsing error in {file_path}, skipping this one")
for file_path in self._zst_archive_table_file_paths:
with zstandard.open(file_path) as f:
try:
for otu in ArchiveOtuTable.read(f, min_version=self.min_archive_otu_table_version):
yield otu
except json.decoder.JSONDecodeError:
logging.error(f"JSON parsing error in {file_path}, skipping this one")
for archive_table in self._archive_table_objects:
for otu in archive_table:
yield otu
Expand Down
10 changes: 8 additions & 2 deletions singlem/summariser.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pandas as pd
import polars as pl
import gzip
import zstandard

from .otu_table import OtuTable
from .rarefier import Rarefier
Expand Down Expand Up @@ -389,8 +390,13 @@ def read_archive_table(df, f, prev_ar):
logging.debug(f"Found {len(lines)} lines in achive otu table list.")
for a in lines:
logging.debug("Reading gzip archive table {} ..".format(a))
with gzip.open(a.strip()) as g:
overall_df, ar = read_archive_table(overall_df, g, ar)
a_path = a.strip()
if a_path.endswith('.zst'):
with zstandard.open(a_path) as g:
overall_df, ar = read_archive_table(overall_df, g, ar)
else:
with gzip.open(a_path) as g:
overall_df, ar = read_archive_table(overall_df, g, ar)
df = overall_df

# Remove suffixes
Expand Down
6 changes: 3 additions & 3 deletions singlem/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ class FastaNameToSampleName:
@staticmethod
def fasta_to_name(query_sequences_file):
sample_name = os.path.basename(query_sequences_file)
# Put .gz first so it is stripped off first.
for extension in ('.gz','.fna','.fq','.fastq','.fasta','.fa'):
# Put .gz and .zst first so they are stripped off first.
for extension in ('.gz','.zst','.fna','.fq','.fastq','.fasta','.fa'):
if sample_name.endswith(extension):
sample_name = sample_name[0:(len(sample_name)-len(extension))]
if extension != '.gz':
if extension not in ('.gz', '.zst'):
break
return sample_name
Binary file added test/data/small.otu_table.json.zst
Binary file not shown.
9 changes: 9 additions & 0 deletions test/test_orf_length_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,14 @@ def test_gzip_good(self):
f.name+'.gz', 72
))

def test_zst_good(self):
with tempfile.NamedTemporaryFile() as f:
f.write(('>seq\n'+'A'*100+'\n').encode())
f.flush()
extern.run("zstd -k {}".format(f.name))
self.assertTrue(OrfLengthChecker.check_sequence_file_contains_an_orf(
f.name+'.zst', 72
))

if __name__ == "__main__":
unittest.main()
8 changes: 8 additions & 0 deletions test/test_summariser.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ def test_archive_to_otu_table_conversion(self):
self.assertEqual('gene\tsample\tsequence\tnum_hits\tcoverage\ttaxonomy\n\
S1.5.ribosomal_protein_L11_rplK\tsmall\tCCTGCAGGTAAAGCGAATCCAGCACCACCAGTTGGTCCAGCATTAGGTCAAGCAGGTGTG\t4\t9.76\tRoot; d__Bacteria; p__Firmicutes; c__Bacilli; o__Bacillales\n', obs)

def test_zst_archive_to_otu_table_conversion(self):
cmd = "{} summarise --input-gzip-archive-otu-table-list <(ls {}/small.otu_table.json.zst) --output-otu-table /dev/stdout".format(
path_to_script,
path_to_data)
obs = extern.run(cmd)
self.assertEqual('gene\tsample\tsequence\tnum_hits\tcoverage\ttaxonomy\n\
S1.5.ribosomal_protein_L11_rplK\tsmall\tCCTGCAGGTAAAGCGAATCCAGCACCACCAGTTGGTCCAGCATTAGGTCAAGCAGGTGTG\t4\t9.76\tRoot; d__Bacteria; p__Firmicutes; c__Bacilli; o__Bacillales\n', obs)

def test_gzip_archive_list_to_otu_table_conversion(self):

archive = '{"fields": ["gene", "sample", "sequence", "num_hits", "coverage", "taxonomy", "read_names", "nucleotides_aligned", "taxonomy_by_known?"], "singlem_package_sha256s": ["2b2afe0114de20451fccfe74360756376dc83d001d890e84e322ab0833eca6ba", "7f406a73d8bb176994055cb966ff350e208986d12c8215722686c17c26e548c7", "735b44ae547c133163cb7d40f417292c35423864d00c95e7f1b32091b27d46c5", "8fc6dcce2766cc01defb3b5c689a1ed8ce9d59b725c67e58c2044dafaae908b3", "172df49937742b8411d41d217500d862567374401eaf393b25107b22ac630202", "4cb1bf226bf28d8198ed5c29e8a76df411d96a6c3ce1256af16887b9a184b0a6", "d473d3ae677e6e46202461ccdedb2aef23c0a10a3412422586b37e397ca37294", "431a2860bb890cd1c7193c565cbf0cc227850cba36fb17fe94df686e74ee9b11", "faa663527bb9aea63cef03859311f2e7f55fe98590a5ec85c5ba85815a6fd13e", "a0daf111380e6e499ad9c10c3ac413aa9016c7503dd459825100168524bff0d1", "aba631d4735aeb9d2dfbbbfee1c0739bf9e99ad6532a3be04ff627f3e6efdae2", "bba10c1feb0c26bdf46aa3d1dcb992744a699cde5cf02bb2728f8397378b342f", "4d91dd794b25fd256508f0814f6a2d31e20dc85e0aa9ea405031398565276768", "9b23c524a6210af0706eea7252c2d378888029f141b9305c3e88cbac3fd83f88", "50a209417b455a48bc67702d6a3809a172c57f00785d8c705a3322e6e2a71f72"], "version": 1, "alignment_hmm_sha256s": ["dd9b7e283598360b89ec91ff3f5c509361a6108a2eadc44bfb29646b1510f6b7", "b1bb943c3449a78f937db960bfdf6b2bed641388d33fce3cb2d5f69e79946ea6", "de92c90f2c83e380ae3953972fb63fcb8ce868dab87a305f9f1811b84ffb3d39", "453ed4a62608a4aec36117a2dd1a276709ff6b130ecb8d7b1612926bfab25527", "20cc450cf4157ecf1772e0325d4d8ed400b597d888a5cb5044ca69098f935656", "4b0bf5b3d7fd2ca16e54eed59d3a07eab388f70f7078ac096bf415f1c04731d9", "7cbba7ba0ed58d21c7519ba3fcef0abe43378e5c38c985b0d5e0e5219f141d92", "4a3bbe5ac594ef3c7c820e74544828e19eca68bf860d64f928729eb4530fce4e", "06a4bed0a765971b891ca4a4bf5680aeef4a4a249ce0c028798c0e912f0ccfb4", "2678fe218ca860a2d88bdbf76935d8c78a00ab6603a041a432505d754ef08250", "b54ff98aa03ab31af39c737a569b23ee4ed9296c8ea088562bfb3db87c38fe4a", "4ae31f14067bf183f38dca20f2aefb580e5ff25848881dd988908b70b67761bb", "d7bb3d544133f38110a329712b3ace7e7d7c989dafa3815d2d5a292b4c575f50", "7639bb919ef54f7baff3ed3a8c924efca97ed375cf4120a6e05d98fd6ef52cbb", "6923b889888ea34fabf463b2c8ad5fe23c94828f1a2631a07601f246f5e87150"], "otus": [["4.11.ribosomal_protein_L10", "minimal", "TTACGTTCACAATTACGTGAAGCTGGTGTTGAGTATAAAGTATACAAAAACACTATGGTA", 2, 4.878048780487805, "Root; d__Bacteria; p__Firmicutes; c__Bacilli; o__Bacillales; f__Staphylococcaceae; g__Staphylococcus", ["HWI-ST1243:156:D1K83ACXX:7:1106:18671:79482", "HWI-ST1243:156:D1K83ACXX:7:1105:19152:28331"], [60, 60], false], ["4.12.ribosomal_protein_L11_rplK", "minimal", "CCTGCAGGTAAAGCGAATCCAGCACCACCAGTTGGTCCAGCATTAGGTCAAGCAGGTGTG", 4, 9.75609756097561, "Root; d__Bacteria; p__Firmicutes; c__Bacilli; o__Bacillales", ["HWI-ST1243:156:D1K83ACXX:7:1109:18214:9910", "HWI-ST1243:156:D1K83ACXX:7:1103:21187:63124", "HWI-ST1243:156:D1K83ACXX:7:1108:10813:6928", "HWI-ST1243:156:D1K83ACXX:7:1105:12385:81842"], [60, 60, 60, 60], false]]}'
Expand Down