mbhall88 / head_to_head_pipeline

Snakemake pipelines to run the analysis for the Illumina vs. Nanopore comparison.
GNU General Public License v3.0
5 stars 2 forks source link

Cluster samples based on distances #63

Closed mbhall88 closed 3 years ago

mbhall88 commented 3 years ago

This is done by loading the distance matrix, creating a networkx graph and then adding all edges with a weight less than or equal to a given threshold

Loading the distance matrix to a DataFrame

import pandas as pd
import numpy as np

DELIM = ","
PAIR_IDX = ("sample1", "sample2")

class AsymmetrixMatrixError(Exception):
    pass

def load_matrix(fpath, delim: str = DELIM, name: str = "") -> pd.DataFrame:
    matrix = []
    with open(fpath) as instream:
        header = next(instream).rstrip()
        names = np.array(header.split(delim)[1:])
        idx = np.argsort(names)
        sorted_names = names[idx]
        for row in map(str.rstrip, instream):
            # sort row according to the name sorting
            sorted_row = np.array(row.split(delim)[1:], dtype=np.int)[idx]
            matrix.append(sorted_row)

    sorted_matrix = np.array(matrix)[idx]
    n_samples = len(sorted_names)
    diagonal_is_zero = all(sorted_matrix[i, i] == 0 for i in range(n_samples))
    if not diagonal_is_zero:
        raise AsymmetrixMatrixError("Distance matrix diagonal is not all zero")

    matrix_is_symmetric = np.allclose(sorted_matrix, sorted_matrix.T)
    if not matrix_is_symmetric:
        raise AsymmetrixMatrixError("Distance matrix is not symmetric")

    mx = pd.DataFrame(sorted_matrix, columns=sorted_names, index=sorted_names)
    # remove the lower triangle of the matrix and the middle diagonal
    mx = mx.where(np.triu(np.ones(mx.shape), k=1).astype(np.bool))
    mx = mx.stack().rename(name).astype(int)
    mx = mx.rename_axis(PAIR_IDX)

    return mx

Turning the distance matrix into a Graph

import networkx as nx

THRESHOLD = 12

def dist_matrix_to_graph(mx: pd.Series, threshold: int = THRESHOLD) -> nx.Graph:
    edges = [(s1, s2, dist) for (s1, s2), dist in mx.iteritems() if dist <= threshold]
    G = nx.Graph()
    G.add_weighted_edges_from(edges)
    return G

Fit a regression model to the data to calculate the appropriate threshold

from typing import Callable, List
from scipy import stats

def fit(xs: List[int], ys: List[int]) -> Callable:
    """Determines the line of best fit for the given distances and returns the linear equation 
    as a function that takes a threshold, x, and returns the equivalent threshold for the data 
    passed to this function.
    Note: xs should be the 'truth' distance values
    """
    slope, intercept, rvalue, pvalue, stderr = stats.linregress(xs, ys)
    def threshold_converter(threshold: float) -> float:
        """y = mx + c, where x is the threshold"""
        return slope * threshold + intercept
    return threshold_converter

Usage

compass_mtx = load_matrix("compass.matrix.csv", name="compass")
bcftools_mtx = load_matrix("bcftools.matrix.csv", name="bcftools")

# limit the matrices to pairs where the compass distance is less than a threshold
max_dist = 100
ix = compass_mtx < max_dist
xs = compass_mtx[ix].to_list()

# calculate the threshold with respect to the compass threshold
bcftools_threshold_converter = fit(xs, bcftools_mtx[ix].to_list())
G_compass = dist_matrix_to_graph(compass_mtx)
bcftools_threshold = bcftools_threshold_converter(THRESHOLD)
G_bcftools = dist_matrix_to_graph(bcftools_mtx, threshold=bcftools_threshold)