Source code for scphylo.io._genotype

import os

import anndata as ad
import ete3
import networkx as nx
import numpy as np
import pandas as pd

import scphylo as scp


[docs]def read(filepath): """Read genotype matrix and read-count matrix. The genotype matrix must be in the in format of :class:`pandas.DataFrame` The read-count matrix must be in the format of :class:`anndata.AnnData`. Parameters ---------- filepath : :obj:`str` The path to the file. The extension must be one of [`.tsv`, `.SC`, `.CFMatrix`, `.h5ad`, `.h5ad.gz`, `.nwk`] Returns ------- :class:`pandas.DataFrame` or :class:`anndata.AnnData` Depends on the format of the input file the output type is different. """ ext = os.path.splitext(filepath)[-1] if ext in [".SC", ".CFMatrix", ".before_FP_FN_NA", ".tsv"]: sc = pd.read_table(filepath, index_col=0) if len(sc.columns) != len(set(sc.columns)): scp.logg.error("Mutation ids must be unique!") return sc elif ext in [".csv"]: sc = pd.read_csv(filepath, index_col=0) if len(sc.columns) != len(set(sc.columns)): scp.logg.error("Mutation ids must be unique!") return sc elif ext in [".h5ad", ".gz"]: return ad.read(filepath) elif ext in [".nwk"]: return _read_nwk(filepath) else: scp.logg.error("Extension is wrong!") return None
[docs]def write(obj, filepath): """Write genotype matrix or read-count matrix into a file. Parameters ---------- obj : :class:`pandas.DataFrame` or :class:`anndata.AnnData` The input object which is going to be written in a file. filepath : :obj:`str` The file path where the `obj` must be written in. """ if isinstance(obj, pd.DataFrame): obj.index.name = "cellIDxmutID" obj.to_csv(filepath, sep="\t") elif isinstance(obj, ad.AnnData): obj.write(filepath + ".h5ad.gz", compression="gzip") else: scp.logg.error("Object instance is wrong!")
def _read_nwk(filepath): tree = ete3.Tree(filepath, format=1) G = nx.DiGraph() node2id = {} i = 0 for n in tree.traverse("postorder"): if n.name == "" or "Inner" in n.name: G.add_node(i, label="––") else: G.add_node(i, label=str(n.name)) node2id[n] = i i += 1 for p in tree.traverse("postorder"): pn = node2id[p] for c in p.children: cn = node2id[c] G.add_edge(pn, cn) i = 0 for e, u, _ in G.edges.data("label"): G.edges[(e, u)]["label"] = f"m{i}" i += 1 G.graph["normal_cells"] = [] G.graph["splitter_mut"] = "\n" G.graph["splitter_cell"] = "\n" data = scp.ul.to_cfmatrix(G) return data def to_vcf(adata, filepath): def _calculate_pl(ref_count, alt_count): if ref_count == 0 and alt_count == 0: return [0, 0, 0] elif ref_count == 0: return [0, 0, 255] elif alt_count == 0: return [255, 0, 0] else: total_count = ref_count + alt_count p_ref = ref_count / total_count p_alt = alt_count / total_count p_ref_given_data = p_ref p_alt_given_data = p_alt p_ref_ref_given_data = p_ref * p_ref pl_ref = -10 * np.math.log10(p_ref_given_data) pl_alt = -10 * np.math.log10(p_alt_given_data) pl_ref_ref = -10 * np.math.log10(p_ref_ref_given_data) return [int(pl_ref), int(pl_ref_ref), int(pl_alt)] gt_mtx = np.where( adata.X == 0, "0/0", np.where(adata.X == 1, "0/1", np.where(adata.X == 3, "./.", adata.X)), ) ref_mtx = adata.layers["total"] - adata.layers["mutant"] alt_mtx = adata.layers["mutant"] with open(filepath, "w") as fout: fout.write("##fileformat=VCFv4.3\n") fout.write("##contig=<ID=chr1,length=1000000>\n") fout.write('##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype">\n') fout.write( '##FORMAT=<ID=PL,Number=G,Type=Integer,Description="Normalized,' " Phred-scaled likelihoods for genotypes as defined in the VCF" ' specification">\n' ) fout.write("#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\t") fout.write("\t".join(adata.obs_names) + "\n") for j in range(adata.shape[1]): samples = [] for i in range(adata.shape[0]): pl = _calculate_pl(ref_mtx[i, j], alt_mtx[i, j]) samples.append(gt_mtx[i, j] + ":" + ",".join(map(str, pl))) variant = ( "\t".join( [ "chr1", str(j + 1), ".", "A", "C", ".", "PASS", ".", "GT:PL", "\t".join(samples), ] ) + "\n" ) fout.write(variant)