Source code for analysis.compare_ai

import logging
from collections import namedtuple
from collections.abc import Callable
from itertools import combinations
from typing import Any

import numpy as np
import pandas as pd

# AnnData for single-cell analysis
from anndata import AnnData
from numpy.typing import NDArray
from scipy.optimize import OptimizeResult, minimize_scalar
from scipy.stats import betabinom, chi2, false_discovery_control

# Local imports
from .as_analysis import clamp_rho, opt_phased_new, opt_prob, opt_unphased_dp

logger = logging.getLogger(__name__)


# Use these functions to figure out how to optimize per group
[docs] def get_imbalance_func( ref_count: NDArray[np.integer[Any]], n_count: NDArray[np.integer[Any]], phase_array: NDArray[np.integer[Any]] | None = None, ) -> tuple[Callable[..., Any], tuple[Any, ...]]: """ Determine which imbalance function to use based on data characteristics. :param ref_count: Array of reference allele counts :param n_count: Array of total counts :param phase_array: Optional phasing information array :return: Tuple of (likelihood function, function arguments) """ like_func: Callable[..., Any] like_func_args: tuple[Any, ...] if len(ref_count) == 1: # Parse single opt like_func = opt_prob # This excludes disp since we always use disp like_func_args = (ref_count[0], n_count[0]) elif phase_array is None: # Do unphased like_func = opt_unphased_dp like_func_args = ( ref_count[:1], n_count[:1], ref_count[1:], n_count[1:], ) else: # Do phased like_func = opt_phased_new like_func_args = (ref_count, n_count, phase_array) return like_func, like_func_args
[docs] def opt_combined_imbalance( prob: float, disp: float, like_func1: Callable[..., float], like_func1_args: tuple[Any, ...], like_func2: Callable[..., float], like_func2_args: tuple[Any, ...], ) -> float: """ Optimize combined imbalance likelihood for two groups. :param prob: Probability parameter :param disp: Dispersion parameter :param like_func1: Likelihood function for group 1 :param like_func1_args: Arguments for group 1 likelihood function :param like_func2: Likelihood function for group 2 :param like_func2_args: Arguments for group 2 likelihood function :return: Combined negative log-likelihood """ return like_func1(prob, disp, *like_func1_args) + like_func2(prob, disp, *like_func2_args)
# Current version that uses shared snps
[docs] def get_compared_imbalance( adata: AnnData, min_count: int = 10, pseudocount: int = 1, phased: bool = False, sample: str | None = None, groups: list[str] | None = None, ) -> dict[tuple[str, str], pd.DataFrame]: """ Compare allelic imbalance between groups using shared SNPs. :param adata: AnnData object containing SNP count data :param min_count: Minimum allele count threshold :param pseudocount: Pseudocount to add to avoid zero counts :param phased: Whether to use phased analysis :param sample: Sample column name for phasing information :param groups: List of groups to compare (if None, compare all) :return: Dict mapping (group1, group2) tuples to comparison DataFrames """ # Failsafe in case preparse somehow misses these if sample is None: phased = False # Should I be comparing all combos by default??? Seems like a lot if groups is None: groups = list(adata.var["group"].dropna().unique()) logger.info("Comparing all combinations of available groups") elif len(groups) == 1: raise ValueError("Please provide 2 or more groups to compare.") # Process initial minimums for whole data dispersion min_count + (2 * pseudocount) snp_cutoff: int = 2 * pseudocount ref_counts: NDArray[np.uint16] = ( adata.layers["ref"].sum(axis=1, dtype=np.uint16).T.A1 + pseudocount ) alt_counts: NDArray[np.uint16] = ( adata.layers["alt"].sum(axis=1, dtype=np.uint16).T.A1 + pseudocount ) n_counts: NDArray[np.uint16] = ref_counts + alt_counts # Calculate dispersion across dataset def opt_disp( rho: float, ref_data: NDArray[np.uint16], n_data: NDArray[np.uint16] ) -> float: rho_safe = float(clamp_rho(rho)) # Prevent division by zero (Issue #228) return float( -np.sum( betabinom.logpmf( ref_data, n_data, (0.5 * (1 - rho_safe) / rho_safe), (0.5 * (1 - rho_safe) / rho_safe), ) ) ) disp: float = float( clamp_rho( minimize_scalar( opt_disp, args=(ref_counts, n_counts), method="bounded", bounds=(0, 1) )["x"] ) ) if phased: gt_array: NDArray[np.uint8] | None = ( adata.obs[sample].str.split("|", n=1).str[0].to_numpy(dtype=np.uint8) ) else: gt_array = None # process counts on a per group basis to avoid recalculating group_dict: dict[str, Any] = {} # group_data = namedtuple("group_data", ["ref_counts", "n_counts", "phase_data", "region_snp_dict"]) # Maybe include the gt_array instead of min_idx group_data = namedtuple("group_data", ["ref_counts", "n_counts", "region_snp_df"]) for group_name in groups: # Subset by group adata_sub = adata[:, adata.var["group"] == group_name] # Create count data per group, should i do pseudocount now or later? ref_counts_group = adata_sub.layers["ref"].sum(axis=1, dtype=np.uint16).T.A1 + pseudocount alt_counts_group = adata_sub.layers["alt"].sum(axis=1, dtype=np.uint16).T.A1 + pseudocount n_counts_group = ref_counts_group + alt_counts_group nonzero_idx = np.where( n_counts_group > snp_cutoff ) # Get indices where no counts were found if nonzero_idx[0].size == 0: logger.warning("Skipping %s: no SNP counts found", group_name) continue # Remove snps with 0 counts from regions idx_df = pd.DataFrame({"index": nonzero_idx[0]}, dtype=np.uint32).reset_index( names="filt_index" ) region_idx_df = adata.uns["feature"].merge(idx_df, on="index") # Check total allele counts/N per region region_n_df = region_idx_df.merge( pd.DataFrame(n_counts_group, columns=["N"]).reset_index(names="index"), on="index" ) group_dict[group_name] = group_data(ref_counts_group, n_counts_group, region_n_df) # Create group combinations and process shared snps group_combos: list[tuple[str, str]] = list(combinations(group_dict.keys(), r=2)) df_dict: dict[tuple[str, str], pd.DataFrame] = {} for group1, group2 in group_combos: # Get relevant counts and nonzero snps ref_counts1, n_counts1, region_snp_df1 = group_dict[group1] ref_counts2, n_counts2, region_snp_df2 = group_dict[group2] # Get shared snps -> get regions that meet cutoff shared_df = region_snp_df1[["region", "index", "N"]].merge( region_snp_df2[["index", "N"]], on="index", suffixes=("1", "2") ) # Take into account pseudocounts added to total N region_agg_df = shared_df.groupby("region", sort=False).agg( snp_idx=("index", tuple), num_snps=("index", "size"), N1=("N1", np.sum), N2=("N2", np.sum), ) region_agg_df["region_cutoff"] = (region_agg_df["num_snps"] * snp_cutoff) + min_count # Find regions where N is satisfied for both # region_agg_df = shared_df.groupby("region", sort=False).agg( # snp_idx=("index", tuple), N1=("N1", np.sum), N2=("N2", np.sum) # ) # Per group snp_dict region_snp_dict = region_agg_df.loc[ ( (region_agg_df["N1"] >= region_agg_df["region_cutoff"]) & (region_agg_df["N2"] >= region_agg_df["region_cutoff"]) ), "snp_idx", ].to_dict() # region_snp_dict = region_agg_df.loc[ # (region_agg_df["N1"] >= region_cutoff) & (region_agg_df["N2"] >= region_cutoff), # "snp_idx"].to_dict() if not region_snp_dict: logger.warning( "Skipping %s-%s comparison: no shared regions with allele counts >= %d", group1, group2, min_count, ) continue # This sub function name kinda long...find better name maybe? df = compare_imbalance_between_groups( disp, ref_counts1, n_counts1, ref_counts2, n_counts2, region_snp_dict, gt_array ) # Using a tuple as key df_dict[(group1, group2)] = df return df_dict
[docs] def compare_imbalance_between_groups( disp: float, ref_counts1: NDArray[np.uint16], n_counts1: NDArray[np.uint16], ref_counts2: NDArray[np.uint16], n_counts2: NDArray[np.uint16], region_snp_dict: dict[str, tuple[int, ...]], gt_array: NDArray[np.uint8] | None = None, ) -> pd.DataFrame: """ Compare allelic imbalance between two groups for shared regions. :param disp: Dispersion parameter :param ref_counts1: Reference allele counts for group 1 :param n_counts1: Total counts for group 1 :param ref_counts2: Reference allele counts for group 2 :param n_counts2: Total counts for group 2 :param region_snp_dict: Dict mapping region names to SNP index tuples :param gt_array: Optional genotype/phasing array :return: DataFrame with comparison statistics and p-values """ # Helper func called by get_compared_imbalance() group_results: list[ tuple[str, int, float, float, float, float, float, float] ] = [] # Store imbalance results # Compare allelic imbalance difference per region for region, snp_list in region_snp_dict.items(): # Get per region snps and counts region_ref1 = ref_counts1[snp_list,] region_n1 = n_counts1[snp_list,] region_ref2 = ref_counts2[snp_list,] region_n2 = n_counts2[snp_list,] # Process which model we'll use to process likelihood per group like_func: Callable[..., Any] like_func_args1: tuple[Any, ...] like_func_args2: tuple[Any, ...] if len(snp_list) == 1: # Parse single opt like_func = opt_prob # This excludes disp since we always use disp like_func_args1 = (region_ref1[0], region_n1[0]) like_func_args2 = (region_ref2[0], region_n2[0]) elif gt_array is None: # Do unphased like_func = opt_unphased_dp like_func_args1 = ( region_ref1[:1], region_n1[:1], region_ref1[1:], region_n1[1:], ) like_func_args2 = ( region_ref2[:1], region_n2[:1], region_ref2[1:], region_n2[1:], ) else: # Do phased # Get phasing info region_gt = gt_array[snp_list,] # Make sure phase with respect to first snp ref if region_gt[0] > 0: region_gt = 1 - region_gt like_func = opt_phased_new like_func_args1 = (region_ref1, region_n1, region_gt) like_func_args2 = (region_ref2, region_n2, region_gt) # Null Hypothesis: Imbalance is the same null_res: OptimizeResult = minimize_scalar( opt_combined_imbalance, args=(disp, like_func, like_func_args1, like_func, like_func_args2), method="bounded", bounds=(0, 1), ) combined_mu: float = null_res["x"] null_ll: float = -1 * null_res["fun"] # Alt Hypothesis: Imbalance is different between groups alt_res1: OptimizeResult = minimize_scalar( like_func, args=(disp, *like_func_args1), method="bounded", bounds=(0, 1) ) alt_res2: OptimizeResult = minimize_scalar( like_func, args=(disp, *like_func_args2), method="bounded", bounds=(0, 1) ) # Get separate mu alt_mu1: float = alt_res1["x"] alt_mu2: float = alt_res2["x"] # get Alternative likelihood alt_ll1: float = alt_res1["fun"] alt_ll2: float = alt_res2["fun"] alt_ll: float = -1 * (alt_ll1 + alt_ll2) # Log ratio ttest lrt: float = -2 * (null_ll - alt_ll) pval: float = chi2.sf(lrt, 1) # Add data to output list # How should i format this, lots of possible outputs group_results.append( (region, len(snp_list), combined_mu, alt_mu1, alt_mu2, null_ll, alt_ll, pval) ) # Create allelic imbalance df # Polars implementation might be more performant df: pd.DataFrame = pd.DataFrame( group_results, columns=["region", "num_snps", "combined_mu", "mu1", "mu2", "null_ll", "alt_ll", "pval"], ) # fdr correction df["fdr_pval"] = false_discovery_control(df["pval"], method="bh") return df
# THIS IS A V0 VERSION THAT DIDN'T USE SHARED SNPS BETWEEN REGIONS # COULD BE USEFUL AS AN OPTION POSSIBLY
[docs] def get_compared_imbalance_diff_snps( adata: AnnData, min_count: int = 10, pseudocount: int = 1, phased: bool = False, sample: str | None = None, groups: list[str] | None = None, ) -> dict[tuple[str, str], pd.DataFrame]: """ Compare allelic imbalance between groups (V0 version without shared SNPs). :param adata: AnnData object containing SNP count data :param min_count: Minimum allele count threshold :param pseudocount: Pseudocount to add to avoid zero counts :param phased: Whether to use phased analysis :param sample: Sample column name for phasing information :param groups: List of groups to compare (if None, compare all) :return: Dict mapping (group1, group2) tuples to comparison DataFrames """ # Failsafe in case preparse somehow misses these if sample is None: phased = False # Should I be comparing all combos by default??? Seems like a lot if groups is None: groups = list(adata.var["group"].dropna().unique()) logger.info("Comparing all combinations of available groups") elif len(groups) == 1: raise ValueError("Please provide 2 or more groups to compare.") # Process initial minimums for whole data dispersion cutoff: int = min_count + (2 * pseudocount) ref_counts: NDArray[np.uint16] = ( adata.layers["ref"].sum(axis=1, dtype=np.uint16).T.A1 + pseudocount ) alt_counts: NDArray[np.uint16] = ( adata.layers["alt"].sum(axis=1, dtype=np.uint16).T.A1 + pseudocount ) n_counts: NDArray[np.uint16] = ref_counts + alt_counts min_idx: tuple[NDArray[np.intp], ...] = np.where( n_counts >= cutoff ) # Get indices for min_count ref_counts_filt: NDArray[np.uint16] n_counts_filt: NDArray[np.uint16] ref_counts_filt, n_counts_filt = ref_counts[min_idx], n_counts[min_idx] # Calculate dispersion across dataset def opt_disp_filt( rho: float, ref_data: NDArray[np.uint16], n_data: NDArray[np.uint16] ) -> float: rho_safe = float(clamp_rho(rho)) # Prevent division by zero (Issue #228) return float( -np.sum( betabinom.logpmf( ref_data, n_data, (0.5 * (1 - rho_safe) / rho_safe), (0.5 * (1 - rho_safe) / rho_safe), ) ) ) disp: float = float( clamp_rho( minimize_scalar( opt_disp_filt, args=(ref_counts_filt, n_counts_filt), method="bounded", bounds=(0, 1), )["x"] ) ) # process counts on a per group basis to avoid recalculating group_dict: dict[str, Any] = {} group_data = namedtuple( "group_data", ["ref_counts", "n_counts", "phase_data", "region_snp_dict"] ) # Maybe include the gt_array instead of min_idx for group_name in groups: # Subset by group adata_sub = adata[:, adata.var["group"] == group_name] # Create count data per group, should i do pseudocount now or later? ref_counts_group = adata_sub.layers["ref"].sum(axis=1, dtype=np.uint16).T.A1 + pseudocount alt_counts_group = adata_sub.layers["alt"].sum(axis=1, dtype=np.uint16).T.A1 + pseudocount n_counts_group = ref_counts_group + alt_counts_group min_idx_group = np.where(n_counts_group >= cutoff) # Get indices for min_count ref_counts_group_filt, n_counts_group_filt = ( ref_counts_group[min_idx_group], n_counts_group[min_idx_group], ) if phased: phase_array = ( adata.obs.iloc[min_idx_group][sample] .str.split("|", n=1) .str[0] .to_numpy(dtype=np.uint8) ) else: phase_array = None # Create region_snp_dict but for each group idx_df = pd.DataFrame({"index": min_idx_group[0]}, dtype=np.uint32).reset_index( names="filt_index" ) region_snp_dict = ( adata.uns["feature"] .merge(idx_df, on="index")[["region", "filt_index"]] .groupby("region", sort=False) .agg(tuple)["filt_index"] .to_dict() ) group_dict[group_name] = group_data( ref_counts_group_filt, n_counts_group_filt, phase_array, region_snp_dict ) # Create group combinations and process shared snps group_combos: list[tuple[str, str]] = list(combinations(group_dict.keys(), r=2)) df_dict: dict[tuple[str, str], pd.DataFrame] = {} for group1, group2 in group_combos: # Might be smart to create a cache to prevent repeating calculations # This sub function name kinda long...find better name maybe? df = compare_imbalance_between_groups_diff_snps( disp, *group_dict[group1], *group_dict[group2] ) if df.empty: logger.warning("Skipping %s - %s comparison: no shared regions", group1, group2) else: # Using a tuple as key df_dict[(group1, group2)] = df return df_dict
[docs] def compare_imbalance_between_groups_diff_snps( disp: float, ref_counts1: NDArray[np.uint16], n_counts1: NDArray[np.uint16], phase_array1: NDArray[np.uint8] | None, region_snp_dict1: dict[str, tuple[int, ...]], ref_counts2: NDArray[np.uint16], n_counts2: NDArray[np.uint16], phase_array2: NDArray[np.uint8] | None, region_snp_dict2: dict[str, tuple[int, ...]], ) -> pd.DataFrame: """ Compare allelic imbalance between two groups with different SNPs per region. :param disp: Dispersion parameter :param ref_counts1: Reference allele counts for group 1 :param n_counts1: Total counts for group 1 :param phase_array1: Optional phasing array for group 1 :param region_snp_dict1: Dict mapping region names to SNP index tuples for group 1 :param ref_counts2: Reference allele counts for group 2 :param n_counts2: Total counts for group 2 :param phase_array2: Optional phasing array for group 2 :param region_snp_dict2: Dict mapping region names to SNP index tuples for group 2 :return: DataFrame with comparison statistics and p-values """ # These values are unpacked versions of named tuple # Helper func called by get_compared_imbalance() # Check if phasing info available phased: bool = (phase_array1 is not None) and (phase_array2 is not None) # Get shared regions shared_regions: list[str] = [i for i in region_snp_dict1 if i in region_snp_dict2] group_results: list[ tuple[str, int, int, float, float, float, float, float, float] ] = [] # Store imbalance results # Compare allelic imbalance difference per region for region in shared_regions: # Get per region snps and counts snp_list1 = region_snp_dict1[region] region_ref1 = ref_counts1[snp_list1,] region_n1 = n_counts1[snp_list1,] snp_list2 = region_snp_dict2[region] region_ref2 = ref_counts2[snp_list2,] region_n2 = n_counts2[snp_list2,] if phased: assert phase_array1 is not None and phase_array2 is not None region_phasing1 = phase_array1[snp_list1,] region_phasing2 = phase_array2[snp_list2,] else: region_phasing1, region_phasing2 = None, None # Process which model we'll use to process likelihood per group like_func1, like_func_inputs1 = get_imbalance_func( region_ref1, region_n1, phase_array=region_phasing1 ) like_func2, like_func_inputs2 = get_imbalance_func( region_ref2, region_n2, phase_array=region_phasing2 ) # Null Hypothesis: Imbalance is the same null_res: OptimizeResult = minimize_scalar( opt_combined_imbalance, args=(disp, like_func1, like_func_inputs1, like_func2, like_func_inputs2), method="bounded", bounds=(0, 1), ) combined_mu: float = null_res["x"] null_ll: float = -1 * null_res["fun"] # Alt Hypothesis: Imbalance is different between groups alt_res1: OptimizeResult = minimize_scalar( like_func1, args=(disp, *like_func_inputs1), method="bounded", bounds=(0, 1) ) alt_res2: OptimizeResult = minimize_scalar( like_func2, args=(disp, *like_func_inputs2), method="bounded", bounds=(0, 1) ) # Get separate mu alt_mu1: float = alt_res1["x"] alt_mu2: float = alt_res2["x"] # get Alternative likelihood alt_ll: float = -1 * (alt_res1["fun"] + alt_res2["fun"]) # Log ratio ttest lrt: float = -2 * (null_ll - alt_ll) pval: float = chi2.sf(lrt, 1) # Add data to output list # How should i format this, lots of possible outputs group_results.append( ( region, len(snp_list1), len(snp_list2), combined_mu, alt_mu1, alt_mu2, null_ll, alt_ll, pval, ) ) # Create allelic imbalance df # Polars implementation might be more performant df: pd.DataFrame = pd.DataFrame( group_results, columns=[ "region", "num_snps_group1", "num_snps_group2", "combined_mu", "mu1", "mu2", "null_ll", "alt_ll", "pval", ], ) # fdr correction df["fdr_pval"] = false_discovery_control(df["pval"], method="bh") return df