import contextlib
import datetime
import functools
import multiprocessing
import os
import shutil
import tempfile
import time
import joblib
import numpy as np
import pkg_resources
import scphylo as scp
def log_input(df_in):
size = df_in.shape[0] * df_in.shape[1]
scp.logg.info(f"input -- size: {df_in.shape[0]}x{df_in.shape[1]}")
scp.logg.info(
f"input -- 0: {np.sum(df_in.values == 0)}#,"
f" {100*np.sum(df_in.values == 0)/size:.1f}%"
)
scp.logg.info(
f"input -- 1: {np.sum(df_in.values == 1)}#,"
f" {100*np.sum(df_in.values == 1)/size:.1f}%"
)
scp.logg.info(
f"input -- NA: {np.sum(df_in.values == 3)}#,"
f" {100*np.sum(df_in.values == 3)/size:.1f}%"
)
scp.logg.info(f"input -- CF: {is_conflict_free_gusfield(df_in)}")
def log_output(df_out, running_time):
size = df_out.shape[0] * df_out.shape[1]
scp.logg.info(f"output -- size: {df_out.shape[0]}x{df_out.shape[1]}")
scp.logg.info(
f"output -- 0: {np.sum(df_out.values == 0)}#,"
f" {100*np.sum(df_out.values == 0)/size:.1f}%"
)
scp.logg.info(
f"output -- 1: {np.sum(df_out.values == 1)}#,"
f" {100*np.sum(df_out.values == 1)/size:.1f}%"
)
scp.logg.info(
f"output -- NA: {np.sum(df_out.values == 3)}#,"
f" {100*np.sum(df_out.values == 3)/size:.1f}%"
)
icf = is_conflict_free_gusfield(df_out)
scp.logg.info("output -- CF: ", end="")
if icf:
scp.logg.info(icf, color="green")
else:
scp.logg.info(icf, color="red")
scp.logg.info(
f"output -- time: {running_time:.1f}s"
f" ({datetime.timedelta(seconds=running_time)})"
)
def log_flip(df_in, df_out):
flips_0_1, flips_1_0, flips_na_0, flips_na_1 = count_flips(
df_in.values, df_out.values, 3
)
fn_rate, fp_rate, na_rate = infer_rates(df_in.values, df_out.values, 3)
scp.logg.info(f"flips -- #0->1: {flips_0_1}")
scp.logg.info(f"flips -- #1->0: {flips_1_0}")
scp.logg.info(f"flips -- #NA->0: {flips_na_0}")
scp.logg.info(f"flips -- #NA->1: {flips_na_1}")
scp.logg.info(f"rates -- FN: {fn_rate:.3f}")
scp.logg.info(f"rates -- FP: {fp_rate:.8f}")
scp.logg.info(f"rates -- NA: {na_rate:.3f}")
def calc_nll_matrix(df_in, df_out, alpha, beta):
if alpha == 0 or beta == 0:
return None
columns = np.intersect1d(df_in.columns, df_out.columns)
indices = np.intersect1d(df_in.index, df_out.index)
D = df_in.loc[indices, columns].values
E = df_out.loc[indices, columns].values
removedMutations = []
objective = 0
for j in range(len(columns)):
numZeros = 0
numOnes = 0
for i in range(len(indices)):
if D[i, j] == 0:
numZeros += 1
objective += np.log(beta / (1 - alpha)) * E[i, j]
elif D[i, j] == 1:
numOnes += 1
objective += np.log((1 - beta) / alpha) * E[i, j]
objective += numZeros * np.log(1 - alpha)
objective += numOnes * np.log(alpha)
if j in removedMutations:
objective -= numZeros * np.log(1 - alpha) + numOnes * (
np.log(alpha) + np.log((1 - beta) / alpha)
)
return -objective
def stat(df_in, df_out, alpha, beta, running_time):
log_input(df_in)
log_output(df_out, running_time)
log_flip(df_in, df_out)
nll = calc_nll_matrix(df_in, df_out, alpha, beta)
scp.logg.info(f"score -- NLL: {nll}")
def parse_params_file(filename):
def _parse_params_file_helper(param):
try:
value = basename.split(f"{param}_")[1]
if "-" in value:
value = value.split("-")[0]
else:
value = value.split(".")[0]
return float(value) if "." in value else int(value)
except IndexError:
return None
data = {}
_, basename = dir_base(filename)
for param in [
"simNo",
"s",
"m",
"h",
"minVAF",
"ISAV",
"n",
"fp",
"fn",
"na",
"d",
"l",
]:
value = _parse_params_file_helper(param)
if value is not None:
data[param] = value
return data
def parse_log_file(filename):
result = {}
_, basename = dir_base(filename)
result["tool"] = basename.split(".")[-1]
with open(filename) as fin:
for line in fin:
line = line.strip()
if "output -- time: " in line:
result["running_time"] = float(
line.replace("output -- time: ", "").split()[0].replace("s", "")
)
if "output -- CF: " in line:
result["is_cf"] = bool(line.replace("output -- CF: ", "").split()[-1])
if "rates -- FN: " in line:
result["fn_rate"] = float(line.replace("rates -- FN: ", "").split()[-1])
if "rates -- FP: " in line:
result["fp_rate"] = float(line.replace("rates -- FP: ", "").split()[-1])
return result
def parse_score_file(filename):
result = {}
_, basename = dir_base(filename)
result["tool"] = basename.split(".")[-1]
with open(filename) as fin:
for line in fin:
line = line.strip()
result[line.split("=")[0]] = float(line.split("=")[1])
return result
def count_flips(I_mtr, O_mtr, na_value=3):
flips_0_1 = 0
flips_1_0 = 0
flips_na_0 = 0
flips_na_1 = 0
n, m = I_mtr.shape
for i in range(n):
for j in range(m):
if I_mtr[i, j] == 0 and O_mtr[i, j] == 1:
flips_0_1 += 1
elif I_mtr[i, j] == 1 and O_mtr[i, j] == 0:
flips_1_0 += 1
elif I_mtr[i, j] == na_value and O_mtr[i, j] == 0:
flips_na_0 += 1
elif I_mtr[i, j] == na_value and O_mtr[i, j] == 1:
flips_na_1 += 1
return flips_0_1, flips_1_0, flips_na_0, flips_na_1
def infer_rates(I_mtr, O_mtr, na_value=3):
flips_0_1, flips_1_0, flips_na_0, flips_na_1 = count_flips(I_mtr, O_mtr, na_value)
fn_rate = flips_0_1 / ((O_mtr == 1) & (I_mtr != na_value)).sum()
fp_rate = flips_1_0 / ((O_mtr == 0) & (I_mtr != na_value)).sum()
na_rate = (flips_na_1 + flips_na_0) / I_mtr.size
return fn_rate, fp_rate, na_rate
[docs]def is_conflict_free(df_in):
"""Check conflict-free criteria via Gusfield algorithm.
The order of this algorithm is :math:`O(nm^2)`
where n is the number of cells and m is the number of mutations.
Parameters
----------
df_in : :class:`pandas.DataFrame`
Input genotype matrix.
Returns
-------
:obj:`bool`
A Boolean checking if the input conflict-free or not.
See Also
--------
:func:`scphylo.ul.is_conflict_free_gusfield`.
"""
D = df_in.astype(int).values
if not np.array_equal(np.unique(D), [0, 1]):
return False
conflict_free = True
for p in range(D.shape[1]):
for q in range(p + 1, D.shape[1]):
oneone = False
zeroone = False
onezero = False
for r in range(D.shape[0]):
if D[r, p] == 1 and D[r, q] == 1:
oneone = True
if D[r, p] == 0 and D[r, q] == 1:
zeroone = True
if D[r, p] == 1 and D[r, q] == 0:
onezero = True
if oneone and zeroone and onezero:
conflict_free = False
return conflict_free
return conflict_free
[docs]def is_conflict_free_gusfield(df_in):
"""Check conflict-free criteria via Gusfield algorithm.
This is an implementation of algorithm 1.1 in :cite:`Gusfield_1991`.
The order of this algorithm is :math:`O(nm)`
where n is the number of cells and m is the number of mutations.
Parameters
----------
df_in : :class:`pandas.DataFrame`
Input genotype matrix.
Returns
-------
:obj:`bool`
A Boolean checking if the input conflict-free or not.
Examples
--------
>>> sc = scp.datasets.test()
>>> scp.ul.is_conflict_free_gusfield(sc)
False
See Also
--------
:func:`scphylo.ul.is_conflict_free`.
"""
I_mtr = df_in.astype(int).values
if not np.array_equal(np.unique(I_mtr), [0, 1]):
return False
def _sort_bin(a):
b = np.transpose(a)
b_view = np.ascontiguousarray(b).view(
np.dtype((np.void, b.dtype.itemsize * b.shape[1]))
)
idx = np.argsort(b_view.ravel())[::-1]
c = b[idx]
return np.transpose(c), idx
Ip = I_mtr.copy()
O_mtr, _ = _sort_bin(Ip)
Lij = np.zeros(O_mtr.shape, dtype=int)
for i in range(O_mtr.shape[0]):
maxK = 0
for j in range(O_mtr.shape[1]):
if O_mtr[i, j] == 1:
Lij[i, j] = maxK
maxK = j + 1
Lj = np.amax(Lij, axis=0)
for i in range(O_mtr.shape[0]):
for j in range(O_mtr.shape[1]):
if O_mtr[i, j] == 1:
if Lij[i, j] != Lj[j]:
return False
return True
def tmpdir(prefix="scphylo.", suffix=".scphylo", dirname="."):
return tempfile.mkdtemp(suffix=suffix, prefix=prefix, dir=dirname)
def tmpfile(prefix="scphylo.", suffix=".scphylo", dirname="."):
return tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dirname)[1]
def tmpdirsys(prefix="scphylo.", suffix=".scphylo", dirname="."):
return tempfile.TemporaryDirectory(suffix=suffix, prefix=prefix)
def cleanup(dirname):
shutil.rmtree(dirname)
def remove(filename):
if os.path.exists(filename):
os.remove(filename)
def dir_base(infile):
basename = os.path.splitext(os.path.basename(infile))[0]
dirname = os.path.dirname(infile)
return dirname, basename
def dirbase(infile):
return os.path.splitext(infile)[0]
def mkdir(indir):
if not os.path.exists(indir):
os.makedirs(indir)
return indir
def executable(binary, appname):
executable = shutil.which(binary)
if executable is None:
if not os.path.exists(f"{scp.settings.tools}/{binary}"):
scp.logg.error(
f"Cannot find the binary file of {appname} with `{binary}` name!"
)
else:
executable = f"{scp.settings.tools}/{binary}"
return executable
def timeit(f):
def wrap(*args, **kwargs):
start_time = time.time()
ret = f(*args, **kwargs)
end_time = time.time()
if end_time - start_time < 60:
scp.logg.info(f"Time needed for {f.__name__}: {end_time - start_time:.3f}")
else:
scp.logg.info(
f"Time needed for {f.__name__}:"
f" {time.strftime('%Hh:%Mm:%Ss', time.gmtime(end_time - start_time))}"
)
return ret
return wrap
def get_file(key):
components = key.split("/")
return pkg_resources.resource_filename(components[0], "/".join(components[1:]))
def with_timeout(timeout):
def decorator(decorated):
@functools.wraps(decorated)
def inner(*args, **kwargs):
pool = multiprocessing.pool.ThreadPool(1)
async_result = pool.apply_async(decorated, args, kwargs)
try:
return async_result.get(timeout)
except multiprocessing.TimeoutError:
return None
return inner
return decorator
@contextlib.contextmanager
def tqdm_joblib(tqdm_object):
class TqdmBatchCompletionCallback(joblib.parallel.BatchCompletionCallBack):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def __call__(self, *args, **kwargs):
tqdm_object.update(n=self.batch_size)
return super().__call__(*args, **kwargs)
old_batch_callback = joblib.parallel.BatchCompletionCallBack
joblib.parallel.BatchCompletionCallBack = TqdmBatchCompletionCallback
try:
yield tqdm_object
finally:
joblib.parallel.BatchCompletionCallBack = old_batch_callback
tqdm_object.close()
def split_mut(mut):
try:
ref = mut.split(".")[-2]
pos = mut.split(".")[-3]
chrom = mut.split(".")[-4]
gene = mut.split(".chr")[0].split("_")[1]
ens = mut.split(".chr")[0].split("_")[0]
alt = mut.split(".")[-1]
return ens, gene, chrom, pos, ref, alt
except IndexError:
return None, None, None, None, None, None