astheeggeggs / lshmm

code to run Li and Stephens
MIT License
3 stars 3 forks source link

Implement a naive diploid Viterbi algorithm to run on "merged" ancestral haplotypes #85

Open szhan opened 2 weeks ago

szhan commented 2 weeks ago

The naive diploid Viterbi algorithm doesn't scale well with the number of reference haplotypes, slowing testing (#65). @astheeggeggs suggested an alternative implementation that "merges" partial ancestral haplotypes such that there are no NONCOPY values in the reference panel fed to the Viterbi algorithm. However, this would only work if all the marginal trees are binary. In practice, this could vary due to polytomies in the marginal trees, but this should suffice for testing. This implementation also entails keeping track of where switching must occur, in order to inform switching in the HMM when moving out of a partial ancestral haplotype.

szhan commented 2 weeks ago

Before doing this, #84 needs to be solved.

szhan commented 2 weeks ago

I think we want to do something like below. The outputs are (1) a matrix containing the sample haplotypes and "merged" ancestral haplotypes and (2) a matrix mapping the index of the "merged" haplotypes to the original haplotypes. The "merged" haplotypes with no NONCOPY values in the original map to only one index across all sites.

import numpy as np

# Input matrix containing all the haplotypes.
A = np.array([
    [ 0,  1,  0,  0,  1],
    [ 1,  0,  1,  0,  1],
    [ 1,  0,  0, -2, -2], # Partial ancestor
    [-2, -2, -2,  1,  0], # Partial ancestor
], dtype=np.int32)

# Expected output matrix.
C = np.array([
    [ 0,  1,  0,  0,  1],
    [ 1,  0,  1,  0,  1],
    [ 1,  0,  0,  1,  0],
], dtype=np.int32)

num_sites = A.shape[1]
num_samples = 2 # Known value
max_num_haps = 2 * num_samples - 1

# Check that the first num_sample haplotypes have no NONCOPY values.
assert np.all(np.sum(A[:num_samples, :] == -2, axis=1) == 0), \
    "The first subset of haplotypes contain NONCOPY values."

# Output matrix with "merged" haplotypes
B = np.zeros((max_num_haps, num_sites), dtype=np.int32) - 2
# Output matrix with indices.
I = np.zeros((max_num_haps, num_sites), dtype=np.int32) - 1
for i in range(num_sites):
    B[:, i] = A[A[:, i] != -2, i]
    I[:, i] = np.where(A[:, i] != -2)[0]
assert np.array_equal(B, C), "Unexpected output genotype matrix."

print(B)
print(I)
szhan commented 2 weeks ago

This function should be added to vit_diploidy.py.

def merge_partial_haplotypes(genotype_matrix, num_sample_haps):
    """
    Merge partial haplotypes, if any, in a genotype matrix, and return:
    1. A matrix containing the sample haplotypes and "merged" ancestral haplotypes.
    2. A matrix mapping the "merged" haplotypes to the haplotypes in the original genotype matrix.

    Note that the "merged" haplotypes with no NONCOPY values in the original genotype matrix
    map to only one index across all sites.

    Any unfilled entries in the "merged" matrix and the index matrix hold NONCOPY and -1, respectively.

    :param numpy.ndarray genotype_matrix: Input matrix containing haplotypes.
    :param int num_sample_haps: Number of sample haplotypes.
    :param numpy.ndarray merged_matrix: Output matrix of original and/or merged haplotypes.
    :param numpy.ndarray index_matrix: Output matrix of indices mapping the original matrix to the merged matrix.
    """
    num_sites = genotype_matrix.shape[1]
    max_num_haps = 2 * num_sample_haps - 1

    # Check that the first _num_sample_haps_ haplotypes have no NONCOPY values.
    assert np.all(np.sum(genotype_matrix[:num_sample_haps, :] == -2, axis=1) == 0), \
        "The first subset of haplotypes contain NONCOPY values."

    merged_matrix = np.zeros((max_num_haps, num_sites), dtype=np.int32) + NONCOPY
    index_matrix = np.zeros((max_num_haps, num_sites), dtype=np.int32) - 1
    for i in range(num_sites):
        merged_matrix[:, i] = genotype_matrix[genotype_matrix[:, i] != -2, i]
        index_matrix[:, i] = np.where(A[:, i] != -2)[0]

    return (merged_matrix, index_matrix)

B, I = merge_partial_haplotypes(A, num_sample_haps=2)
assert np.array_equal(B, C), "Unexpected output genotype matrix."
szhan commented 2 weeks ago

The "merged" matrix will be used to determine the emission probabillity, and the index matrix to determine whether switching occurs.

szhan commented 2 weeks ago

Actually, it gets more complicated in the diploid case... The above function only gives us the merged haplotypes, which still need to be converted to implied phased genotypes before running diploid Viterbi.