Source code for mapping.filter_remap_reads

"""Filter and merge remapped BAM reads using WASP algorithm.

Provides functions for filtering reads that remap to the same locus
after allele swapping and merging with non-remapped reads.
"""

from __future__ import annotations

import logging
import subprocess
import timeit

# Rust acceleration (required; no fallback)
from wasp2_rust import filter_bam_wasp

logger = logging.getLogger(__name__)


[docs] def filt_remapped_reads( to_remap_bam: str, remapped_bam: str, filt_out_bam: str, keep_read_file: str | None = None, threads: int = 1, same_locus_slop: int = 0, ) -> None: """Filter remapped reads using WASP algorithm. Uses Rust acceleration. Args: to_remap_bam: Original BAM with reads to remap remapped_bam: Remapped BAM with swapped alleles filt_out_bam: Output filtered BAM keep_read_file: Optional file to write kept read names threads: Number of threads for BAM I/O same_locus_slop: Tolerance (bp) for same locus test (for indels) """ filter_bam_wasp( to_remap_bam, remapped_bam, filt_out_bam, keep_read_file=keep_read_file, threads=threads, same_locus_slop=same_locus_slop, )
[docs] def merge_filt_bam(keep_bam: str, remapped_filt_bam: str, out_bam: str, threads: int = 1) -> None: """Merge filtered BAM files using samtools (faster than pysam). Both input BAMs are already coordinate-sorted, so samtools merge produces sorted output without needing an explicit sort step. Args: keep_bam: BAM with reads that didn't need remapping remapped_filt_bam: BAM with filtered remapped reads out_bam: Output merged BAM threads: Number of threads for samtools """ start_time = timeit.default_timer() # Merge using samtools (faster than pysam, inputs are already sorted) subprocess.run( ["samtools", "merge", "-@", str(threads), "-f", "-o", out_bam, keep_bam, remapped_filt_bam], check=True, ) logger.info("Merged BAM in %.2f seconds", timeit.default_timer() - start_time) # Index the merged BAM (no sort needed - inputs were already sorted) start_index = timeit.default_timer() subprocess.run(["samtools", "index", "-@", str(threads), out_bam], check=True) logger.info("Indexed BAM in %.2f seconds", timeit.default_timer() - start_index)