Source code for analysis.run_analysis_sc

# Default Python package Imports
import warnings
from pathlib import Path
from typing import NamedTuple

import anndata as ad
import pandas as pd
from anndata import AnnData

# local imports
from .as_analysis_sc import adata_count_qc, get_imbalance_sc


# Class that stores relevant data
[docs] class WaspAnalysisSC:
[docs] def __init__( self, adata_file: str | Path, bc_map: str | Path, min_count: int | None = None, pseudocount: int | None = None, phased: bool | None = None, sample: str | None = None, groups: str | list[str] | None = None, model: str | None = None, out_file: str | Path | None = None, z_cutoff: float | None = None, ) -> None: # User input data self.adata_file = adata_file self.bc_map = bc_map self.min_count = min_count self.pseudocount = pseudocount self.sample = sample self.groups = groups self.model = model self.out_file = out_file self.phased = phased self.z_cutoff = z_cutoff # Should i default to something like 4 or 5? # Default to single dispersion model # TODO ADD GROUP DISP and other model types if (self.model is None) or (self.model not in {"single"}): self.model = "single" # Default min count of 10 if self.min_count is None: self.min_count = 10 if self.pseudocount is None: # self.pseudocount = 0 # either 0 or 1 for default self.pseudocount = 1 # Make sure min and pseudocounts are valid if not all((i >= 0) and isinstance(i, int) for i in (self.min_count, self.pseudocount)): raise ValueError("min_count and pseudocount must be non-negative integers") # Handle group inputs as strings to list if isinstance(self.groups, str): # Check if group file or comma delim string if Path(self.groups).is_file(): with open(self.groups) as group_file: self.groups = [line.strip() for line in group_file] else: self.groups = [s.strip() for s in self.groups.split(",")] # Create default outfile if self.out_file is None: self.out_file = str(Path.cwd() / "ai_results.tsv") # do this after # Process output names for groups self.out_dir = Path(self.out_file).parent self.prefix = Path(self.out_file).stem
[docs] def update_data(self, data: NamedTuple) -> None: # Update attributes with namedtuple after parsing # Only updates matching keys for key in data._fields: if hasattr(self, key): setattr(self, key, getattr(data, key))
# Define namedtuple for adata inputs
[docs] class AdataInputs(NamedTuple): adata: AnnData sample: str groups: list[str] phased: bool
# Process adata inputs
[docs] def process_adata_inputs( adata: AnnData, ai_files: WaspAnalysisSC | None = None, bc_map: str | Path | None = None, sample: str | None = None, groups: list[str] | None = None, phased: bool | None = None, ) -> AdataInputs: if ai_files is not None: bc_map = ai_files.bc_map sample = ai_files.sample # ai_files.groups is already converted to List[str] in __init__ if it was a string groups = ai_files.groups if isinstance(ai_files.groups, list) else None phased = ai_files.phased # Check genotype and phasing input if "samples" not in adata.uns_keys(): if sample is not None: raise KeyError(f"Sample '{sample}' provided, but no samples found in count data") phased = False elif sample is None: sample_list = adata.uns["samples"] if len(sample_list) != 1: raise ValueError( "Genotype Ambiguous: Count data contains mutiple samples, but none provided. " f"Please input a sample from list: {sample_list}" ) else: sample = sample_list[0] else: sample_list = adata.uns["samples"] if sample not in sample_list: raise KeyError( f"Sample: '{sample}' not found in dataset. " f"Please input a sample from list: {sample_list}" ) else: # We gotta subset to include het genotypes now if not any(i in ["1|0", "0|1", "1/0", "0/1"] for i in adata.obs[sample].unique()): raise ValueError(f"Heterozygous genotypes not found for sample: {sample}.") # adata = adata[adata.obs[sample].isin(['1|0', '0|1', '1/0', '0/1'])] # Using copy instead of view stops implicit mod warning, need to check memory usage adata = adata[adata.obs[sample].isin(["1|0", "0|1", "1/0", "0/1"])].copy() adata.obs = adata.obs.reset_index( drop=True ) # Have to reset index every time i subset adata # Have to reindex the regions after filtering GT's if "feature" in adata.uns_keys(): # idx_df = adata.obs[["index"]].reset_index( # drop=True).copy().reset_index(names="filt_index") adata.uns["feature"] = ( adata.uns["feature"] .merge(adata.obs[["index"]].reset_index(names="filt_index"), on="index")[ ["region", "filt_index"] ] .rename(columns={"filt_index": "index"}) ) # Need to update adata.obs index col as well adata.obs["index"] = adata.obs.index # Check phasing if True or None if phased is not False: if {"0|1", "1|0"} == set(adata.obs[sample].unique()): phased = True else: phased = False warning_msg = ( f"Phased model selected for unphased genotypes ({adata.obs[sample].unique()}). " "Switching to unphased model" ) warnings.warn(warning_msg, stacklevel=2) # Add groups if barcode mapping provided if bc_map is not None: map_df = pd.read_csv( bc_map, sep="\t", header=None, names=["group"], index_col=0, dtype="category" ) adata.var = adata.var.join(map_df, how="left") # No existing groups or mapping provided if "group" not in adata.var_keys(): raise KeyError("groups not found in dataset, please provide a barcode mapping") elif groups is not None: valid_groups = list(adata.var["group"].dropna().unique()) new_groups = [i for i in groups if i in valid_groups] if len(new_groups) == 0: raise KeyError(f"Provided groups {groups} not found.") elif len(new_groups) < len(groups): skipped_groups = [i for i in groups if i not in new_groups] print(f"Skipping missing groups: {skipped_groups}") groups = new_groups else: groups = new_groups else: groups = list(adata.var["group"].dropna().unique()) # Ensure all required values are set (type narrowing for mypy) assert sample is not None, "sample must be set by this point" assert groups is not None, "groups must be set by this point" assert phased is not None, "phased must be set by this point" # Return properly typed namedtuple return AdataInputs(adata, sample, groups, phased)
# Parse user inputs and run entire pipeline
[docs] def run_ai_analysis_sc( count_file: str | Path, bc_map: str | Path, min_count: int | None = None, pseudocount: int | None = None, phase: bool | None = None, sample: str | None = None, groups: str | list[str] | None = None, out_file: str | Path | None = None, z_cutoff: float | None = None, ) -> None: # Create data class that holds input data ai_files = WaspAnalysisSC( adata_file=count_file, bc_map=bc_map, min_count=min_count, pseudocount=pseudocount, phased=phase, sample=sample, groups=groups, model="single", out_file=out_file, z_cutoff=z_cutoff, ) adata_inputs = process_adata_inputs(ad.read_h5ad(ai_files.adata_file), ai_files=ai_files) # print(*vars(ai_files).items(), sep="\n") # For debugging # print(adata_inputs) # For debugging # Update class attributes ai_files.update_data(adata_inputs) # adata = adata_inputs.adata # Hold parsed adata file obj in memory # Prefilter and hold adata data in memory adata = adata_count_qc(adata_inputs.adata, z_cutoff=ai_files.z_cutoff, gt_error=None) # Type narrowing: after update_data, these values should be properly set assert ai_files.min_count is not None, "min_count should be set in __init__" assert ai_files.pseudocount is not None, "pseudocount should be set in __init__" assert ai_files.phased is not None, "phased should be set by process_adata_inputs" assert isinstance(ai_files.groups, list), "groups should be a list after update_data" # Create dictionary of resulting dataframes df_dict = get_imbalance_sc( adata, min_count=ai_files.min_count, pseudocount=ai_files.pseudocount, phased=ai_files.phased, sample=ai_files.sample, groups=ai_files.groups, ) # Write outputs out_path = Path(ai_files.out_dir) out_path.mkdir(parents=True, exist_ok=True) for key, value in df_dict.items(): group_out_file = out_path / f"{ai_files.prefix}_{key.replace('/', '-')}.tsv" value.sort_values(by="pval", ascending=True).to_csv( group_out_file, sep="\t", header=True, index=False ) print( f"Allelic Imbalance measured for {len(df_dict)} groups!\n" f"Results written to: {out_path}/{ai_files.prefix}_[GROUP].tsv" )