Source code for tag.writer

#!/usr/bin/env python
#
# -----------------------------------------------------------------------------
# Copyright (C) 2016 Daniel Standage <daniel.standage@gmail.com>
#
# This file is part of tag (http://github.com/standage/tag) and is licensed
# under the BSD 3-clause license: see LICENSE.
# -----------------------------------------------------------------------------

from __future__ import print_function
from collections import defaultdict
try:
    from StringIO import StringIO
except ImportError:  # pragma: no cover
    from io import StringIO
import sys
import tag
from tag import Directive, Feature, Sequence, GFF3Reader


[docs]class GFF3Writer(): """ Writes sequence features and other GFF3 entries to a file. The :code:`instream` is expected to be an iterable of sequence features and other related objects. Set :code:`outfile` to :code:`-` to write output to stdout. >>> # Sort and tidy GFF3 file in 3 lines! >>> reader = GFF3Reader(infilename=tag.tests.data_file('grape-cpgat.gff3')) >>> writer = GFF3Writer(instream=reader, outfile='/dev/null') >>> writer.retainids = True >>> writer.write() """ def __init__(self, instream, outfile='-'): self._instream = instream self.outfilename = outfile self.outfile = None if isinstance(outfile, str): self.outfile = tag.open(outfile, 'w') else: self.outfile = outfile self.retainids = False self.complex_separators = True self.feature_counts = defaultdict(int) self._seq_written = False self._block_count = 0 def _write_separator(self, blockitvl): if not blockitvl: return if self._block_count < blockitvl: return print('###', file=self.outfile) self._block_count = 0 def __del__(self): if self.outfilename != '-' and not isinstance(self.outfile, StringIO): self.outfile.close()
[docs] def write(self, blockitvl=0): """Pull features from the instream and write them to the output. By default, separator tags are added at the end of complex features. To intersperse separators throughout blocks of simple features, specify a desired block size with `blockitvl`. """ print(repr(Directive('##gff-version 3')), file=self.outfile) for entry in self._instream: if isinstance(entry, Directive): if entry.type == 'gff-version': pass else: print(repr(entry), file=self.outfile) continue if isinstance(entry, Feature): for feature in entry: if self.retainids: continue if feature.num_children > 0 or feature.is_multi: if feature.is_multi and feature != feature.multi_rep: continue self.feature_counts[feature.type] += 1 fid = '{}{}'.format(feature.type, self.feature_counts[feature.type]) feature.add_attribute('ID', fid) else: feature.drop_attribute('ID') if isinstance(entry, Sequence) and not self._seq_written: print('##FASTA', file=self.outfile) self._seq_written = True print(repr(entry), file=self.outfile) if isinstance(entry, Feature): if entry.is_complex: self._block_count = 0 if self.complex_separators: print('###', file=self.outfile) else: self._block_count += 1 self._write_separator(blockitvl)