-
-
Save apcamargo/2b7ca3032c1e80333adc1e54f47a0966 to your computer and use it in GitHub Desktop.
| #!/usr/bin/env python | |
| """ | |
| This script processes SAM (Sequence Alignment/Map format) inputs from standard | |
| input and extracts alignment information that is then provided in a tab-separated | |
| table. The following fields are produced: query, target, query_length, query_start, | |
| query_end, target_start, target_end, alignment_length, alignment_identity. | |
| This script was designed for use with SAM files produced by minimap2. However, | |
| it will work with any SAM data that: | |
| - Uses 'M' characters in the CIGAR strings (instead of '=' and 'X'). | |
| - Includes an 'NM' tag that represents the total number of mismatches and gaps | |
| in the alignment. | |
| Input: | |
| - SAM format records read from stdin. | |
| Output: | |
| - A tab-separated table with the following fields: | |
| query, target, query_length, query_start, query_end, target_start, | |
| target_end, alignment_length, alignment_identity. | |
| Usage: | |
| # Process a SAM file | |
| cat input.sam | ./sam2tsv.py > output.tsv | |
| # Convert a BAM file to SAM and process it | |
| samtools view input.bam | ./sam2tsv.py > output.tsv | |
| """ | |
| import re | |
| import sys | |
| class Record: | |
| def __init__(self, fields: list[str]) -> None: | |
| self.qname: str = fields[0] | |
| self.tname: str = fields[2] | |
| self.pos: int = int(fields[3]) | |
| self.cigar: str = fields[5] | |
| self.query_seq: str = fields[9] | |
| self.nm = self._get_nm_tag(fields) | |
| self.query_length = self._get_query_length() | |
| def _get_nm_tag(self, fields: list[str]) -> int: | |
| """Extracts the NM tag (edit distance) from SAM optional fields.""" | |
| for field in fields[11:]: | |
| if field.startswith("NM:i:"): | |
| return int(field.split(":")[-1]) | |
| return 0 | |
| def _parse_cigar(self, cigar: str) -> tuple[int, int, int, int, int]: | |
| """ | |
| Parses the CIGAR string and returns: | |
| - total_M: sum of M operations (alignment matches/mismatches) | |
| - total_I: sum of I operations (insertions in query) | |
| - total_D: sum of D operations (deletions from reference) | |
| - total_S: sum of S operations (soft clips) | |
| - total_H: sum of H operations (hard clips) | |
| - clip_start: number of clipped bases (soft or hard) at beginning | |
| """ | |
| ops = re.findall(r"(\d+)([MIDNSHP=X])", cigar) | |
| total_M = total_I = total_D = total_S = total_H = 0 | |
| clip_start = 0 | |
| if ops: | |
| first_num, first_op = ops[0] | |
| if first_op in ("S", "H"): | |
| clip_start = int(first_num) | |
| for num, op in ops: | |
| num = int(num) | |
| if op == "M": | |
| total_M += num | |
| elif op == "I": | |
| total_I += num | |
| elif op == "D": | |
| total_D += num | |
| elif op == "S": | |
| total_S += num | |
| elif op == "H": | |
| total_H += num | |
| return total_M, total_I, total_D, total_S, total_H, clip_start | |
| def _get_query_length(self) -> int: | |
| """Determines the query length from SAM fields.""" | |
| h_clips = re.findall(r"(\d+)H", self.cigar) | |
| hard_clipped = sum(int(h) for h in h_clips) | |
| return len(self.query_seq) + hard_clipped | |
| def process_alignment(self) -> tuple[str, str, int, int, int, int, int, int, float]: | |
| """Processes a SAM alignment record and returns relevant information.""" | |
| if self.nm is None: | |
| return None | |
| total_M, total_I, total_D, total_S, total_H, clip_start = self._parse_cigar(self.cigar) | |
| query_start = clip_start + 1 | |
| query_end = query_start + total_M + total_I - 1 | |
| target_end = self.pos + total_M + total_D - 1 | |
| alignment_length = total_M + total_I + total_D | |
| identity = ( | |
| (alignment_length - self.nm) / alignment_length | |
| if alignment_length > 0 | |
| else 0 | |
| ) | |
| return ( | |
| self.qname, | |
| self.tname, | |
| self.query_length, | |
| query_start, | |
| query_end, | |
| self.pos, | |
| target_end, | |
| alignment_length, | |
| identity, | |
| ) | |
| def main() -> None: | |
| print( | |
| "query\ttarget\tquery_length\tquery_start\tquery_end\ttarget_start\t" | |
| "target_end\talignment_length\talignment_identity" | |
| ) | |
| for line in sys.stdin: | |
| line = line.rstrip() | |
| if not line or line.startswith("@"): | |
| continue | |
| fields = line.split("\t") | |
| ( | |
| qname, | |
| tname, | |
| query_length, | |
| query_start, | |
| query_end, | |
| target_start, | |
| target_end, | |
| alignment_length, | |
| identity, | |
| ) = Record(fields).process_alignment() | |
| print( | |
| f"{qname}\t{tname}\t{query_length}\t{query_start}\t{query_end}\t" | |
| f"{target_start}\t{target_end}\t{alignment_length}\t{identity:.4f}" | |
| ) | |
| if __name__ == "__main__": | |
| main() |
Thank you, @shenwei356! Are those always equivalent? Despite my comment in the docstrings (see below), I don't remember the details for this decision. If the tools that output = and X never produce CIGAR strings with M, I can't see why it would be a problem to count them.
This script was designed for use with SAM files produced by minimap2. However,
it will work with any SAM data that:
- Uses 'M' characters in the CIGAR strings (instead of '=' and 'X').
- Includes an 'NM' tag that represents the total number of mismatches and gaps
in the alignment.
Did you use this script in alignments containing = and X? Did it work correctly?
There is one tool that produces CIGAR strings with =, but I can't remember it.
For X, it's my LexicMap. I just added a new command to convert tabular alignment results to SAM format (see here), and I use sam2tsv.py to convert it back to tabular format, for checking if the SAM is right. Then, I just learned today that Minimap2 doesn't use 'X'... When I used X, samtools did not report errors, so it's legal and compatible.
It's also easy enough to count those three separately (in total_M, total_X, and total_eq) and then have a property that returns the sum of the three. I will implement this.
Thanks!
Suggestion:
L73:
Can be changed to
to be compatible with SAM formats produced by some other tools.