mmore500 / hstrat

hstrat enables phylogenetic inference on distributed digital evolution populations
Other
3 stars 2 forks source link

Feature: Alternate Approaches/Optimizations for Tree Reconstruction Algorithms #152

Open vivaansinghvi07 opened 1 month ago

vivaansinghvi07 commented 1 month ago

This issue documents the progress in #148:

Code used to run the new methods:

from __future__ import annotations
from collections.abc import Callable
from random import randint
from time import perf_counter

import sys
from typing import Any
import numpy as np

from hstrat import hstrat
from hstrat.genome_instrumentation import HereditaryStratigraphicColumn
from hstrat.phylogenetic_inference.tree._impl._build_trie_from_artifacts import (
    MatrixColumn,
    build_trie_from_artifacts,
    build_trie_from_artifacts_matrix,
    build_trie_from_artifacts_progressive,
    build_trie_from_artifacts_cpp_sync
)

class Genome:
    __slots__ = ["annotation", "data"]

    def __init__(
        self,
        data: list[int] | None = None,
        annotation: HereditaryStratigraphicColumn | None = None,
    ) -> None:
        self.data = data or [randint(1, 100)]
        self.annotation = annotation or HereditaryStratigraphicColumn(
            stratum_retention_policy=hstrat.recency_proportional_resolution_algo.Policy(
                8
            ),
            stratum_differentia_bit_width=16,
        )

    def get_descendant(self) -> Genome:
        return Genome(
            self.data + [randint(1, 100)], self.annotation.CloneDescendant()
        )

    @property
    def score(self) -> int:  # weighted average, more recent preferred
        return sum(i * x for i, x in enumerate(self.data[:10], start=1)) // 45

def simulate_evolution(
    parents: list[Genome], *, generations: int, carrying_capacity: int
) -> list[Genome]:
    for _ in range(generations):
        children = sum(
            [[p.get_descendant() for p in parents] for _ in range(3)], []
        )
        children.sort(key=lambda x: x.score)
        parents = children[-carrying_capacity:]
    return parents

if __name__ == "__main__":

    print("Starting evolutionary simulation...")
    generations = 20
    if "--generations" in sys.argv:
        generations = int(sys.argv[sys.argv.index("--generations") + 1])
    carrying_capacity = 100
    if "--carrying-capacity" in sys.argv:
        carrying_capacity = int(sys.argv[sys.argv.index("--carrying-capacity") + 1])

    start_pop = [Genome()]
    evolved = simulate_evolution(
        start_pop, generations=100, carrying_capacity=1000
    )
    extant_population = [x.annotation for x in evolved]

    assemblage = hstrat.pop_to_assemblage(extant_population)
    ranks = assemblage._assemblage_df.index.to_numpy().astype(np.uint64)
    differentia = assemblage._assemblage_df.to_numpy().astype(np.uint64)

    pop = [tuple(zip(*art.IterRankDifferentiaZip())) for art in extant_population]
    taxon_labels = list(map(str, range(len(pop))))

    print("Benchmarking...")

    methods = {
        "c++": (build_trie_from_artifacts_cpp_sync, (pop, taxon_labels), {}),
        "cpp": (build_trie_from_artifacts_cpp_sync, (pop, taxon_labels), {}),
        "matrix": (build_trie_from_artifacts_matrix, (ranks, differentia, extant_population[0]._stratum_differentia_bit_width, [*range(len(pop))]), {}),
        "normal": (build_trie_from_artifacts, (extant_population, taxon_labels, False, lambda x: x), {}),
        "progressive_synchronous": (build_trie_from_artifacts_progressive, (extant_population, taxon_labels), {"multiprocess": False}),
        "progressive_asynchronous": (build_trie_from_artifacts_progressive, (extant_population, taxon_labels), {"multiprocess": True}),
    }
    if "--methods" in sys.argv:
        funcs = sys.argv[sys.argv.index("--methods") + 1:]
    else:
        funcs = methods.keys() - {'cpp'}

    if "--matrix-dry" in sys.argv:
        f, args, _ = methods['matrix']
        f(*args)

    for name in funcs:
        for title, (f, args, kwargs) in methods.items():
            if name in title:
                start = perf_counter()
                for _ in range(10):
                    f(*args, **kwargs)
                print(f"{title.replace('_', ' ').title()}: {perf_counter() - start:.2f}s")

In order to access the prototype methods for testing, you can import them directly from the _impl file such as in this benchmarking file.

vivaansinghvi07 commented 1 week ago

v1 goals/clean-up before merging:

Easy to fix:

Harder to fix:

v2 future goals after merging: