Source code for tag.reader

#!/usr/bin/env python
#
# -----------------------------------------------------------------------------
# Copyright (C) 2015 Daniel Standage <daniel.standage@gmail.com>
#
# This file is part of tag (http://github.com/standage/tag) and is licensed
# under the BSD 3-clause license: see LICENSE.
# -----------------------------------------------------------------------------

from collections import defaultdict
import sys
import tag
from tag import Range
from tag import Comment
from tag import Directive
from tag import Feature
from tag import Sequence


[docs]class DuplicatedRegionError(ValueError): pass
[docs]class AnnotationSortingError(ValueError): pass
[docs]class AnnotationOutOfBoundsError(ValueError): pass
[docs]class FeatureTypeDisagreementError(ValueError): pass
def clean_lines(instream): for line in instream: line = line.strip() if line != '': yield line
[docs]def parse_fasta(data): # pragma: no cover """ Load sequences in Fasta format. This generator function yields a Sequence object for each sequence record in a GFF3 file. Implementation stolen shamelessly from http://stackoverflow.com/a/7655072/459780. """ name, seq = None, [] for line in data: line = line.rstrip() if line.startswith('>'): if name: yield Sequence(name, ''.join(seq)) name, seq = line, [] else: seq.append(line) if name: yield Sequence(name, ''.join(seq))
class RegionSet(object): def __init__(self): self.declared = dict() self.inferred = dict() def add_region(self, region): if region.seqid in self.declared: raise DuplicatedRegionError(region.seqid) self.declared[region.seqid] = region def add_feature(self, feature): if feature.seqid not in self.inferred: self.inferred[feature.seqid] = feature.range else: newrange = self.inferred[feature.seqid].merge(feature.range) self.inferred[feature.seqid] = newrange if feature.seqid in self.declared: seqregion = self.declared[feature.seqid] if not feature._range.within(seqregion.range): msg = 'feature {} out-of-bounds'.format(feature.slug) raise AnnotationOutOfBoundsError(msg)
[docs]class GFF3Reader(): """ Loads sequence features and other GFF3 entries into memory. Features and other objects are obtained by iterating over the GFF3Reader object. See the :code:`tag.select` module for available filtering functions. >>> infile = tag.tests.data_file('pbar-withseq.gff3') >>> reader = GFF3Reader(infilename=infile) >>> for entry in reader: ... print(type(entry)) <class 'tag.directive.Directive'> <class 'tag.directive.Directive'> <class 'tag.directive.Directive'> <class 'tag.comment.Comment'> <class 'tag.feature.Feature'> <class 'tag.feature.Feature'> <class 'tag.feature.Feature'> <class 'tag.sequence.Sequence'> <class 'tag.sequence.Sequence'> >>> reader = GFF3Reader(infilename=infile) >>> for feature in tag.select.features(reader, type='gene'): ... print(feature.slug) gene@NW_011929623.1[4557, 5749] gene@NW_011929624.1[3725, 4229] By default, the GFF3Reader assumes the input is not in sorted order and loads the entire annotation into memory to ensure sorting and resolution of ID/Parent relationships. If the input is sorted according to genomic position and independent features are separated by :code:`###` directives, memory consumption will be drastically reduced by setting the :code:`assumesorted` attribute to True. The :code:`strict` attribute enforces some additional sanity checks, which in some exceptional cases may need to be relaxed. """ def __init__(self, instream=None, infilename=None, assumesorted=False, strict=True, checkorder=True): assert (not instream) != (not infilename), ( 'provide either an instream or an infile name, not both' ) self.instream = instream if infilename: self.infilename = infilename self.instream = tag.open(infilename, 'r') self.assumesorted = assumesorted self.strict = strict self.checkorder = checkorder self.regions = RegionSet() self._counter = 0 self._prevrecord = None def __iter__(self): """Generator function returns GFF3 entries.""" self._reset() for line in clean_lines(self.instream): if line == '###': for obj in self._handle_intermediate(): yield obj elif line == '##FASTA': for sequence in parse_fasta(self.instream): self.records.append(sequence) break elif line.startswith('#'): self._handle_special(line) else: self._handle_feature(line) for obj in self._resolve_features(): if self._counter == 0: isv = isinstance(obj, Directive) and obj.type == 'gff-version' if not isv: self._prevrecord = Directive('##gff-version 3') self._counter += 1 yield self._prevrecord self._prevrecord = obj self._counter += 1 yield obj def _handle_intermediate(self): if self.assumesorted or not self.checkorder: for obj in self._resolve_features(): if self.checkorder: if self._prevrecord and self._prevrecord > obj: msg = 'sorting error: {} > {}'.format( self._prevrecord.slug, obj.slug, ) raise AnnotationSortingError(msg) self._prevrecord = obj self._counter += 1 yield obj def _handle_special(self, line): if line.startswith('##'): record = Directive(line) if record.type == 'sequence-region': self.regions.add_region(record) else: record = Comment(line) self.records.append(record) def _handle_feature(self, line): feature = Feature.from_gff3(line) self.regions.add_feature(feature) featureid = feature.get_attribute('ID') parentid = feature.get_attribute('Parent') if parentid is None: # Only add one entry from each multi-feature if featureid is None or featureid not in self.featsbyid: self.records.append(feature) else: parentids = parentid if isinstance(parentid, list) else [parentid] for pid in parentids: self.featsbyparent[pid].append(feature) if featureid is not None: if featureid in self.featsbyid: # Validate multi-features other = self.featsbyid[featureid] if feature.type != other.type: msg = 'feature type disagreement' msg += ' for ID="{}": {} vs {}'.format( featureid, feature.type, other.type ) raise FeatureTypeDisagreementError(msg) other.add_sibling(feature) else: self.featsbyid[featureid] = feature def _resolve_features(self): """Resolve Parent/ID relationships and yield all top-level features.""" for parentid in self.featsbyparent: parent = self.featsbyid[parentid] for child in self.featsbyparent[parentid]: parent.add_child(child, rangecheck=self.strict) # Replace top-level multi-feature reps with a pseudo-feature for n, record in enumerate(self.records): if not isinstance(record, Feature): continue if not record.is_multi: continue assert record.multi_rep == record newrep = sorted(record.siblings + [record])[0] if newrep != record: for sib in sorted(record.siblings + [record]): sib.multi_rep = newrep if sib != newrep: newrep.add_sibling(sib) record.siblings = None parent = newrep.pseudoify() self.records[n] = parent if not self.assumesorted: for seqid in self.regions.inferred: if seqid not in self.regions.declared: seqrange = self.regions.inferred[seqid] srstring = '##sequence-region {:s} {:d} {:d}'.format( seqid, seqrange.start + 1, seqrange.end ) seqregion = Directive(srstring) self.records.append(seqregion) for record in sorted(self.records): yield record self._reset() def _reset(self): """Clear internal data structure.""" self.records = list() self.featsbyid = dict() self.featsbyparent = defaultdict(list) self.countsbytype = dict()