Last active
May 23, 2018 17:35
-
-
Save p7k/ef55af29411cc85a8bdf07cdc7916cfc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Translate a Tandem Repeat Finder file into a bed file containing | |
annotated repeat intervals. | |
> This file is a text file which contains the same information, in the same | |
order, as the summary table file, plus consensus pattern and repeat sequences | |
http://tandem.bu.edu/trf/trf.definitions.html#table | |
""" | |
from __future__ import division | |
import argparse | |
import collections | |
import functools | |
import re | |
import attr | |
@attr.s(slots=True, frozen=True) | |
class DATRecord(object): | |
""" | |
example: | |
32781 32798 9 2.0 9 100 0 18 44 33 22 0 1.53 ACCAGACAG ACCAGACAGACCAGACAG | |
""" | |
start = attr.ib(convert=int) | |
stop = attr.ib(convert=int) | |
repeat_period_size = attr.ib(convert=int) | |
number_of_copies = attr.ib(convert=float) | |
consensus_pattern_size = attr.ib(convert=int) | |
percent_matches = attr.ib(convert=int) | |
percent_indels = attr.ib(convert=int) | |
alignment_score = attr.ib(convert=int) | |
percent_composition_A = attr.ib(convert=int) | |
percent_composition_C = attr.ib(convert=int) | |
percent_composition_G, = attr.ib(convert=int) | |
percent_composition_T = attr.ib(convert=int) | |
entropy = attr.ib(convert=float) | |
consensus_pattern = attr.ib() | |
repeat_seq = attr.id() | |
def zero_based_exclusive(self): | |
return attr.assoc(self, start=(self.start - 1)) | |
# Sequence: 1 dna:chromosome chromosome:GRCh37:1:1:249250621:1 | |
# Sequence: NM_001316362.1_PRKRA_from_6_ssto_hap7 | |
SEQUENCE_NAME_RE = re.compile(r'Sequence:\s+(?P<name>\S+)') | |
BedRecord = collections.namedtuple('BedRecord', ( | |
'contig', 'start', 'stop', 'unit', 'count')) | |
DAT_FILTERS = dict( | |
filter_min_match=lambda val, dat: int(dat.percent_matches) >= val, | |
filter_max_indel=lambda val, dat: int(dat.percent_indels) <= val, | |
filter_min_count=lambda val, dat: float(dat.number_of_copies) >= val) | |
def parse_dat_records(stream): | |
sequence_name = None | |
for line in stream: | |
try: | |
dat_record = DATRecord(line.split()) | |
assert sequence_name is not None, 'sequence name must be set' | |
yield (sequence_name, dat_record) | |
except TypeError: | |
try: | |
sequence_name = SEQUENCE_NAME_RE.match(line).group('name') | |
except AttributeError: | |
continue | |
def trf_dat_to_bed(in_dat, out_bed, filters): | |
"""Post process TandemRepeatFinder output DAT stream into a BED stream. | |
Args: | |
in_dat (io.stream): input dat descriptor. | |
out_bed (io.stream): output bed descriptor. | |
filters (list<func(dat_record>)): dat record filter unary functions. | |
""" | |
# skip empty lines | |
non_empty_lines = (line for line in in_dat if line != '') | |
# parse and filter dat records | |
dat_records = parse_dat_records(non_empty_lines) | |
dat_records_filtered = ( | |
(seq_name, dat_rec) for seq_name, dat_rec in dat_records | |
if all(_filter(dat_rec) for _filter in filters)) | |
# prepare bed records | |
bed_records = (BedRecord( | |
contig=seq_name, | |
# TRF is 1 - based | |
start=int(dat_rec.start) - 1, | |
stop=dat_rec.stop, | |
unit=dat_rec.consensus_pattern, | |
count=float(dat_rec.number_of_copies) | |
) for seq_name, dat_rec in dat_records_filtered) | |
# write bed records | |
for bed_record in bed_records: | |
out_bed.write( | |
'{contig}\t{start}\t{stop}\tunit={unit};count={count}\n'.format( | |
**bed_record._asdict())) | |
def parse_args(): | |
parser = argparse.ArgumentParser( | |
description=trf_dat_to_bed.__doc__.split('\n')[0], | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
parser.add_argument( | |
'in_dat', type=argparse.FileType('r'), help='Input TRF data file.') | |
parser.add_argument( | |
'out_bed', type=argparse.FileType('w'), help='Output BED file.') | |
def filter_type(_type): | |
return lambda val: None if val == 'off' else _type(val) | |
filter_group = parser.add_argument_group( | |
title='filters', description='("off" turns off the filter)') | |
filter_group.add_argument( | |
'--filter-min-match', type=filter_type(int), default=99, | |
help='Exclude records below match percent.') | |
filter_group.add_argument( | |
'--filter-max-indel', type=filter_type(int), default=0, | |
help='Exclude records above indel percent.') | |
filter_group.add_argument( | |
'--filter-min-count', type=filter_type(float), default=2.0, | |
help='Exclude records below count value.') | |
# prepare filter functions | |
raw_args = parser.parse_args() | |
filters = tuple( | |
# partial application -> unary | |
functools.partial(DAT_FILTERS[arg_name], arg_value) | |
for arg_name, arg_value in vars(raw_args).iteritems() | |
if arg_name.startswith('filter') and arg_value is not None) | |
args = argparse.Namespace( | |
filters=filters, **{ | |
name: value for name, value in vars(raw_args).iteritems() | |
if not name.startswith('filter')}) | |
return args | |
if __name__ == '__main__': | |
trf_dat_to_bed(**vars(parse_args())) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment