Source code for analysis.as_analysis

"""
Author: Aaron Ho
Python Version: 3.9
"""

# Default Python package Imports
import inspect
import time
import timeit
from collections.abc import Callable
from pathlib import Path
from typing import Any, Literal, cast

import numpy as np

# External package imports
import pandas as pd
from numpy.typing import NDArray
from scipy.optimize import OptimizeResult, minimize, minimize_scalar
from scipy.special import expit
from scipy.stats import betabinom, chi2, false_discovery_control

from wasp2.cli import create_spinner_progress, error, info, success

# =============================================================================
# BETA-BINOMIAL RHO PARAMETER BOUNDS (Issue #228)
# =============================================================================
# The beta-binomial parameterization uses alpha = mu * (1-rho) / rho, which
# causes division by zero when rho=0 and produces zero alpha/beta when rho=1.
# We clamp rho to (epsilon, 1-epsilon) to prevent numerical instability.
# =============================================================================

RHO_EPSILON: float = 1e-10


[docs] def clamp_rho(rho: float | NDArray[np.float64]) -> float | NDArray[np.float64]: """ Clamp dispersion parameter rho to safe range (epsilon, 1-epsilon). The beta-binomial parameterization uses alpha = mu * (1-rho) / rho, which causes division by zero when rho=0 and produces zero alpha/beta when rho=1. This function prevents these boundary issues. Args: rho: Dispersion parameter (scalar or array), expected in [0, 1] Returns: Clamped rho in range (RHO_EPSILON, 1 - RHO_EPSILON) """ return np.clip(rho, RHO_EPSILON, 1.0 - RHO_EPSILON)
[docs] def opt_linear( disp_params: NDArray[np.float64], ref_counts: NDArray[np.integer[Any]], n_array: NDArray[np.integer[Any]], ) -> float: """ Optimize dispersion parameter weighted by N (Function called by optimizer) :param disp_params: Array of dispersion parameters [disp1, disp2] :param ref_counts: Array of reference allele counts :param n_array: Array of total counts (N) :return: Negative log-likelihood value """ disp1, disp2 = disp_params exp_in = disp1 + (n_array * disp2) exp_in = np.select([exp_in > 10, exp_in < -10], [10, -10], default=exp_in) rho = clamp_rho(expit(exp_in)) ll = -np.sum( betabinom.logpmf(ref_counts, n_array, (0.5 * (1 - rho) / rho), (0.5 * (1 - rho) / rho)) ) # If alpha is beta return float(ll)
[docs] def opt_prob( in_prob: float | NDArray[np.float64], in_rho: float | NDArray[np.float64], k: int | NDArray[np.integer[Any]], n: int | NDArray[np.integer[Any]], log: bool = True, ) -> float | NDArray[np.float64]: """ Optimize Probability value that maximizes imbalance likelihood. (Function called by optimizer) **CRITICAL FUNCTION** - Used by as_analysis_sc.py and compare_ai.py :param in_prob: Probability parameter (scalar or array) :param in_rho: Dispersion parameter (scalar or array) :param k: Reference allele count(s) :param n: Total count(s) :param log: If True, return negative log-likelihood; if False, return pmf :return: Negative log-likelihood (if log=True) or probability mass (if log=False) """ prob = in_prob rho = clamp_rho(in_rho) # Prevent division by zero at boundaries alpha = prob * (1 - rho) / rho beta = (1 - prob) * (1 - rho) / rho if log is True: ll = -1 * betabinom.logpmf(k, n, alpha, beta) else: ll = betabinom.pmf(k, n, alpha, beta) return cast(float | NDArray[np.float64], ll)
# updated phasing optimizer: currently used in single-cell analysis # This version modifies prob arr outside of func # GT phase should be with respect to first snp on first chrom
[docs] def opt_phased_new( prob: float, disp: float | NDArray[np.float64], ref_data: NDArray[np.integer[Any]], n_data: NDArray[np.integer[Any]], gt_data: NDArray[np.integer[Any]], ) -> float: """ Optimize likelihood for phased data (updated version for single-cell analysis). **CRITICAL FUNCTION** - Used by as_analysis_sc.py and compare_ai.py :param prob: Probability parameter to optimize :param disp: Dispersion parameter (scalar or array) :param ref_data: Array of reference allele counts :param n_data: Array of total counts :param gt_data: Array of genotype phase information :return: Negative log-likelihood value """ # phase and prob with respect to snp1 as ref phased_ll = opt_prob(np.abs(prob - gt_data), disp, ref_data, n_data) return float(np.sum(phased_ll))
# Updated unphasing optimizer using DP
[docs] def opt_unphased_dp( prob: float, disp: float | NDArray[np.float64], first_ref: NDArray[np.integer[Any]], first_n: NDArray[np.integer[Any]], phase_ref: NDArray[np.integer[Any]], phase_n: NDArray[np.integer[Any]], ) -> float: """ Optimize likelihood while taking phase into account using dynamic programming. **CRITICAL FUNCTION** - Used by as_analysis_sc.py and compare_ai.py :param prob: Probability parameter to optimize :param disp: Dispersion parameter (scalar or array) :param first_ref: Reference count for first position (length 1 array) :param first_n: Total count for first position (length 1 array) :param phase_ref: Array of reference counts for subsequent positions :param phase_n: Array of total counts for subsequent positions :return: Negative log-likelihood value """ # Get likelihood of first pos first_ll = opt_prob(prob, disp, first_ref[0], first_n[0]) # Get likelihood witth regard to phasing of first pos phase1_like = opt_prob(prob, disp, phase_ref, phase_n, log=False) phase2_like = opt_prob(1 - prob, disp, phase_ref, phase_n, log=False) prev_like: float = 1.0 # phase1_like and phase2_like are arrays when phase_ref/phase_n are arrays phase1_arr = cast(NDArray[np.float64], phase1_like) phase2_arr = cast(NDArray[np.float64], phase2_like) for p1, p2 in zip(phase1_arr, phase2_arr): p1_combined_like = prev_like * p1 p2_combined_like = prev_like * p2 prev_like = float((0.5 * p1_combined_like) + (0.5 * p2_combined_like)) return float(first_ll + -np.log(prev_like))
[docs] def parse_opt( df: pd.DataFrame, disp: float | NDArray[np.float64] | None = None, phased: bool = False ) -> tuple[float, float]: """ Optimize necessary data when running model :param df: Dataframe with allele counts :param disp: pre-computed dispersion parameter, defaults to None :param phased: Whether data is phased :return: Tuple of (alt_ll, mu) - likelihood of alternate model and imbalance proportion """ snp_count = df.shape[0] # Create array used for AI analysis ref_array = df["ref_count"].to_numpy() n_array = df["N"].to_numpy() # In the case that we do use linear model with disp per N if disp is None: disp = df["disp"].to_numpy() res: OptimizeResult if snp_count > 1: # If data is phased if phased: # Use known phasing info gt_array = df["GT"].to_numpy() # First pos with respect to ref if gt_array[0] > 0: gt_array = 1 - gt_array res = minimize_scalar( opt_phased_new, args=(disp, ref_array, n_array, gt_array), method="bounded", bounds=(0, 1), ) else: # Use unphased algorithm for subsequent phases first_ref = ref_array[:1] first_n = n_array[:1] phase_ref = ref_array[1:] phase_n = n_array[1:] res = minimize_scalar( opt_unphased_dp, args=(disp, first_ref, first_n, phase_ref, phase_n), method="bounded", bounds=(0, 1), ) else: # Single site optimize res = minimize_scalar( opt_prob, args=(disp, ref_array[0], n_array[0]), method="bounded", bounds=(0, 1) ) # Get res data mu: float = res["x"] alt_ll: float = -1 * res["fun"] return alt_ll, mu
[docs] def single_model(df: pd.DataFrame, region_col: str, phased: bool = False) -> pd.DataFrame: """ Find allelic imbalance using normal beta-binomial model :param df: Dataframe with allele counts :param region_col: Name of column to group by :param phased: Whether data is phased :return: Dataframe with imbalance likelihood """ info("Running analysis with single dispersion model") def opt_disp(rho: float, ref_data: NDArray[Any], n_data: NDArray[Any]) -> float: """Negative log-likelihood for dispersion optimization (rho clamped).""" rho_safe = float(clamp_rho(rho)) return float( -np.sum( betabinom.logpmf( ref_data, n_data, (0.5 * (1 - rho_safe) / rho_safe), (0.5 * (1 - rho_safe) / rho_safe), ) ) ) ref_array = df["ref_count"].to_numpy() n_array = df["N"].to_numpy() disp_start = timeit.default_timer() with create_spinner_progress() as progress: progress.add_task("Optimizing dispersion parameter", total=None) disp: float = float( clamp_rho( minimize_scalar( opt_disp, args=(ref_array, n_array), method="bounded", bounds=(0, 1) )["x"] ) ) success(f"Optimized dispersion parameter ({timeit.default_timer() - disp_start:.2f}s)") group_df = df.groupby(region_col, sort=False) include_groups_supported = "include_groups" in inspect.signature(group_df.apply).parameters apply_kwargs = {"include_groups": False} if include_groups_supported else {} ll_start = timeit.default_timer() with create_spinner_progress() as progress: progress.add_task("Optimizing imbalance likelihood", total=None) null_test = group_df.apply( lambda x: np.sum( betabinom.logpmf( x["ref_count"].to_numpy(), x["N"].to_numpy(), (0.5 * (1 - disp) / disp), (0.5 * (1 - disp) / disp), ) ), **apply_kwargs, ) # Optimize Alt alt_test = group_df.apply(lambda x: parse_opt(x, disp, phased=phased), **apply_kwargs) alt_df = pd.DataFrame(alt_test.tolist(), columns=["alt_ll", "mu"], index=alt_test.index) success(f"Optimized imbalance likelihood ({timeit.default_timer() - ll_start:.2f}s)") ll_df = pd.concat([null_test, alt_df], axis=1).reset_index() ll_df.columns = [region_col, "null_ll", "alt_ll", "mu"] ll_df["lrt"] = -2 * (ll_df["null_ll"] - ll_df["alt_ll"]) ll_df["pval"] = chi2.sf(ll_df["lrt"], 1) return ll_df
[docs] def linear_model(df: pd.DataFrame, region_col: str, phased: bool = False) -> pd.DataFrame: """ Find allelic imbalance using linear allelic imbalance model, weighting imbalance linear with N counts :param df: Dataframe with allele counts :param region_col: Name of column to group by :param phased: Whether data is phased :return: Dataframe with imbalance likelihood """ info("Running analysis with linear dispersion model") in_data = df[["ref_count", "N"]].to_numpy().T disp_start = time.time() with create_spinner_progress() as progress: progress.add_task("Optimizing dispersion parameters", total=None) res: OptimizeResult = minimize( opt_linear, x0=(0, 0), method="Nelder-Mead", args=(in_data[0], in_data[1]) ) disp1: float disp2: float disp1, disp2 = res["x"] df["disp"] = clamp_rho(expit(disp1 + (in_data[1] * disp2))) success(f"Optimized dispersion parameters ({time.time() - disp_start:.2f}s)") # Group by region group_df = df.groupby(region_col, sort=False) include_groups_supported = "include_groups" in inspect.signature(group_df.apply).parameters apply_kwargs = {"include_groups": False} if include_groups_supported else {} # Get null test ll_start = time.time() with create_spinner_progress() as progress: progress.add_task("Optimizing imbalance likelihood", total=None) null_test = group_df.apply( lambda x: np.sum( betabinom.logpmf( x["ref_count"].to_numpy(), x["N"].to_numpy(), (0.5 * (1 - x["disp"].to_numpy()) / x["disp"].to_numpy()), (0.5 * (1 - x["disp"].to_numpy()) / x["disp"].to_numpy()), ) ), **apply_kwargs, ) # Optimize Alt alt_test = group_df.apply(lambda x: parse_opt(x), **apply_kwargs) alt_df = pd.DataFrame(alt_test.tolist(), columns=["alt_ll", "mu"], index=alt_test.index) success(f"Optimized imbalance likelihood ({time.time() - ll_start:.2f}s)") ll_df = pd.concat([null_test, alt_df], axis=1).reset_index() ll_df.columns = [region_col, "null_ll", "alt_ll", "mu"] ll_df["lrt"] = -2 * (ll_df["null_ll"] - ll_df["alt_ll"]) ll_df["pval"] = chi2.sf(ll_df["lrt"], 1) return ll_df
[docs] def get_imbalance( in_data: pd.DataFrame | str | Path, min_count: int = 10, pseudocount: int = 1, method: Literal["single", "linear"] = "single", phased: bool = False, region_col: str | None = None, groupby: str | None = None, ) -> pd.DataFrame: """ Process input data and method for finding allelic imbalance. **CRITICAL FUNCTION** - Main analysis entry point used by run_analysis.py :param in_data: Dataframe with allele counts or filepath to TSV file :param min_count: minimum allele count for analysis :param pseudocount: pseudocount to add to allele counts :param method: analysis method ("single" or "linear") :param phased: whether to use phased genotype information :param region_col: column name to group variants by (e.g., gene, peak) :param groupby: alternative grouping column (overrides region_col if provided) :return: DataFrame with imbalance statistics per region """ model_dict: dict[str, Callable[[pd.DataFrame, str, bool], pd.DataFrame]] = { "single": single_model, "linear": linear_model, } # If preparsed dataframe or filepath if isinstance(in_data, pd.DataFrame): df = in_data else: df = pd.read_csv( in_data, sep="\t", dtype={ "chrom": "category", "pos": np.uint32, "ref": "category", "alt": "category", "ref_count": np.uint16, "alt_count": np.uint16, "other_count": np.uint16, }, ) # If no region_col measure imbalance per variant if region_col is None: region_col = "variant" groupby = None # no parent df[region_col] = df["chrom"].astype("string") + "_" + df["pos"].astype("string") # Process pseudocount values and filter data by min df[["ref_count", "alt_count"]] += pseudocount df["N"] = df["ref_count"] + df["alt_count"] df = df.loc[df["N"].ge(min_count + (2 * pseudocount)), :] # Get unique values based on group if groupby is not None: region_col = groupby keep_cols = ["chrom", "pos", "ref_count", "alt_count", "N", region_col] # Check validity of phasing info if phased: # Check if GT are actually phased - use error() so always shown (even in quiet mode) if "GT" not in df.columns: error("Genotypes not found: Switching to unphased model") phased = False elif len(df["GT"].unique()) <= 1: error(f"All genotypes {df['GT'].unique()}: Switching to unphased model") phased = False elif not any(i in ["1|0", "0|1"] for i in df["GT"].unique()): error(f"Expected GT as 0|1 and 1|0 but found: {df['GT'].unique()}") error("Switching to unphased model") phased = False else: # GT is indeed phased df["GT"] = df["GT"].str.split("|", n=1).str[0].astype(dtype=np.uint8) keep_cols.append("GT") df = df[keep_cols].drop_duplicates() p_df = model_dict[method](df, region_col, phased) # Perform analysis # remove pseudocount df[["ref_count", "alt_count"]] -= pseudocount df["N"] -= pseudocount * 2 snp_counts = pd.DataFrame(df[region_col].value_counts(sort=False)).reset_index() snp_counts.columns = [region_col, "snp_count"] count_alleles = ( df[[region_col, "ref_count", "alt_count", "N"]].groupby(region_col, sort=False).sum() ) merge_df = pd.merge(snp_counts, p_df, how="left", on=region_col) as_df = pd.merge(count_alleles, merge_df, how="left", on=region_col) as_df["fdr_pval"] = false_discovery_control(as_df["pval"], method="bh") return as_df