Source code for scphylo.ul._hclustering

import warnings

import numba
import numpy as np
import pandas as pd
import scipy as sp
from scipy.cluster.hierarchy import fcluster, linkage
from sklearn.metrics import pairwise_distances

import scphylo as scp
from scphylo.external._betabinom import pmf_BetaBinomial


@numba.jit(nopython=True)
def _l1_ignore_na(a, b):
    a[a == 3] = np.nan
    b[b == 3] = np.nan
    return np.nanmean(np.abs(a - b))


def dist_l1_ignore_na(I_mtr, n_jobs=1):
    dist = pairwise_distances(
        I_mtr, metric=_l1_ignore_na, force_all_finite="allow-nan", n_jobs=n_jobs
    )
    np.fill_diagonal(dist, 0)
    return dist


# https://gist.github.com/FedericoV/0e7d6d8c8794a99a7a42
@numba.jit(nopython=True)
def _cosine_ignore_na(u, v):
    m = u.shape[0]
    udotv = 0
    u_norm = 0
    v_norm = 0
    for i in range(m):
        if (np.isnan(u[i])) or (np.isnan(v[i])):
            continue
        udotv += u[i] * v[i]
        u_norm += u[i] * u[i]
        v_norm += v[i] * v[i]
    u_norm = np.sqrt(u_norm)
    v_norm = np.sqrt(v_norm)
    if (u_norm == 0) or (v_norm == 0):
        ratio = 1.0
    else:
        ratio = 1 - udotv / (u_norm * v_norm)
    if ratio < 0:
        return 0
    return ratio


def dist_cosine_ignore_na(I_mtr, n_jobs=1):
    dist = pairwise_distances(
        I_mtr, metric=_cosine_ignore_na, force_all_finite="allow-nan", n_jobs=n_jobs
    )
    np.fill_diagonal(dist, 0)
    return dist


def _dist_dendro(T, V, I_mtr):
    warnings.filterwarnings("ignore")
    PROB_SEQ_ERROR = 0.001

    def logSum_1(x, y):
        big = np.copy(x)
        big[x < y] = y[x < y]
        small = np.copy(x)
        small[x >= y] = y[x >= y]
        tmp = big + np.log(1 + np.exp(small - big))
        # tmp[np.bitwise_and(x==-np.inf, y==-np.inf)] = -np.inf
        # tmp[np.bitwise_and(x==np.inf, y==np.inf)] = np.inf
        return tmp

    D = np.divide(V, T)

    Mu = np.nanmean(D, axis=0)
    Var = np.nanvar(D, axis=0, ddof=1)
    a = ((1 - Mu) * Mu / Var - 1) * Mu
    b = ((1 - Mu) * Mu / Var - 1) * (1 - Mu)
    bad_muts = (
        (a <= 0) | (b <= 0) | np.isnan(a) | np.isnan(b) | np.isinf(a) | np.isinf(b)
    )
    V = V[:, ~bad_muts]
    T = T[:, ~bad_muts]
    # D = D[:, ~bad_muts]
    I_mtr = I_mtr[:, ~bad_muts]
    a = a[~bad_muts]
    b = b[~bad_muts]

    lPz0 = np.zeros(T.shape, dtype=np.float64)
    lPz1 = np.zeros(T.shape, dtype=np.float64)
    for i in range(T.shape[0]):
        for j in range(T.shape[1]):
            if T[i, j] != 0:
                lPz0[i, j] = np.log(
                    sp.stats.binom.pmf(V[i, j], T[i, j], PROB_SEQ_ERROR)
                )
                lPz1[i, j] = np.log(pmf_BetaBinomial(V[i, j], T[i, j], a[j], b[j]))

    Pg = np.sum(I_mtr == 1, axis=0) / I_mtr.shape[0]
    lPg = np.log(Pg)
    l1Pg = np.log(1 - Pg)
    lupiall = logSum_1(lPz0 + l1Pg, lPz1 + lPg)

    dist = np.zeros((T.shape[0], T.shape[0]), dtype=np.float64)
    for i in range(T.shape[0]):
        ldowni = logSum_1(lPz0[i, :] + lPz0 + l1Pg, lPz1[i, :] + lPz1 + lPg)
        lupi = logSum_1(lupiall[i, :] + lupiall, ldowni)
        dist[i, :] = np.sum(lupi - ldowni, axis=1)

    dist = dist - np.min(dist) + 1
    return dist, bad_muts


def dist_dendro(adata):
    T = adata.layers["total"]
    V = adata.layers["mutant"]
    G = adata.layers["genotype"]
    G[(G == 1) | (G == 3)] = 1
    G[G == 2] = 0
    dist, bad_muts = _dist_dendro(T, V, G)
    scp.pp.remove_mut_by_list(adata, bad_muts)
    scp.logg.info(f"{sum(bad_muts)} mutations filtered")
    return dist


[docs]def hclustering(df, metric="l1", method="ward", return_dist=False): """Hierarchical clustering. Parameters ---------- df : :class:`pandas.DataFrame` The genotype matrix. metric: :obj:`str`, optional The metric option. Can be: - `l1` - `cosine` method : :obj:`str`, optional The method for the hierarchical clustering, by default "ward" Returns ------- :obj:`dict` A dictionary in which keys are the number of clusters and values are the cluster labels for each item. """ if metric == "l1": dist = dist_l1_ignore_na(df.values) elif metric == "cosine": dist = dist_cosine_ignore_na(df.values) else: scp.logg.error("Wroing `metric` choice!") clust = linkage(dist[np.triu_indices(dist.shape[0], 1)], method=method) clusters = {} for i in range(2, dist.shape[0]): fc = fcluster(clust, i, criterion="maxclust") clusters[i] = pd.Series(fc, index=df.index) if return_dist: return dist return clusters