insitro / redun

Yet another redundant workflow engine
https://insitro.github.io/redun/
Apache License 2.0
525 stars 45 forks source link

Scatter-merge operation in redun? #70

Closed PhilPalmer closed 1 year ago

PhilPalmer commented 1 year ago

Hi,

Thanks for developing a great workflow manager in Python!

I'm currently trying to implement a pipeline that performs a scatter and merge operation, however, I've been encountering issues and could not see any examples or documentation that addressed my use case.

The pipeline runs a tool called NetMHCpan which predicts binding for a given set of peptides (file) and MHC alleles (string). I would like to run a task that parallelises over both alleles and peptides, by splitting the peptides into batches, running these batches for each allele, and then performs another task to concatenate the per allele results.

Does redun support a way of doing this? I have tried using nested list comprehension, nested for loops and itertools.product but always get the following error. I suspect I might need to use a functools method such as starmap_ but I'm not quite sure how to implement this.

Traceback (most recent call last):
  File "example_workflow.py", line 116, in <module>
    run_netmhcpan(
  File "example_workflow.py", line 79, in run_netmhcpan
    peptide_allele_paths = [
  File "example_workflow.py", line 79, in <listcomp>
    peptide_allele_paths = [
  File "/home/phil/anaconda3/envs/tvax/lib/python3.10/site-packages/redun/expression.py", line 87, in __iter__
    raise TypeError("Expressions of unknown length cannot be iterated.")
TypeError: Expressions of unknown length cannot be iterated.

Any help that you can provide would be greatly appreciated. Thanks in advance!

Here is the minimal reproducible example of my code:

import os
import pandas as pd
from pathlib import Path
from redun import task, Scheduler, script, File
from redun.config import Config
from typing import List

"""
Redun workflow to run NetMHCpan and NetMHCIIpan to predict peptide-HLA binding affinities.
"""

redun_namespace = "netmhcpan"

@task(version="1")
def chunk_peptides(
    peptides: pd.DataFrame,
    peptides_dir: Path,
    peptide_chunk_size: int = 500,
) -> List[File]:
    """
    Split the input peptides into chunks and save each chunk to a file.
    """
    peptide_chunks = [
        peptides[i : i + peptide_chunk_size]
        for i in range(0, len(peptides), peptide_chunk_size)
    ]
    peptide_paths = []
    for chunk_idx, peptide_chunk in enumerate(peptide_chunks):
        peptide_path_chunk = f"{peptides_dir}/peptides_{chunk_idx}.txt"
        peptide_chunk.to_csv(peptide_path_chunk, index=False, header=False)
        peptide_paths.append(File(str(peptide_path_chunk)))
    return peptide_paths

@task(version="1")
def predict_affinity_netmhcpan(
    allele: str,
    peptides_path: File,
    mhc_type: str = "mhc1",
    results_dir: str = None,
    netmhcpan_cmd: str = None,
):
    peptide_chunk_idx = peptides_path.path.split("_")[-1].split(".")[0]
    allele_path = f"{results_dir}/{allele.replace('*', '_').replace(':', '')}_preds_p{peptide_chunk_idx}.xls"
    peptides_path = File(os.path.abspath(peptides_path.path))
    peptides_flag = (
        f"-p {peptides_path.path}"
        if mhc_type == "mhc1"
        else f"-inptype 1 -f {peptides_path.path}"
    )
    return script(
        f"""
        {netmhcpan_cmd} {peptides_flag} -a {allele} -BA -xls -xlsfile {allele_path}
        """,
        inputs=[peptides_path.stage(peptides_path.path)],
        outputs=[allele, File(allele_path).stage(allele_path)],
    )

def run_netmhcpan(
    peptides: pd.DataFrame,
    hla_alleles: pd.DataFrame,
    mhc_type: str,
    peptides_dir: Path,
    peptide_chunk_size: int = 500,
    results_dir: Path = Path("."),
    netmhcpan_cmd: str = "netMHCpan",
):
    """
    Run NetMHCpan or NetMHCIIpan for a given set of peptides and HLA alleles, splitting the input peptides
    into chunks and running netmhcpan for each chunk before concatenating the results into the per allele files.
    """
    # Split peptides into chunks
    peptide_paths = chunk_peptides(peptides, peptides_dir, peptide_chunk_size)
    # For each allele, run netmhcpan for each peptide chunk
    peptide_allele_paths = [
        predict_affinity_netmhcpan(
            allele,
            peptide_path,
            mhc_type,
            results_dir,
            netmhcpan_cmd,
        )
        for allele in hla_alleles["allele"].values
        for peptide_path in peptide_paths
    ]
    # TODO: Concatenate the results for each allele
    return peptide_allele_paths

if __name__ == "__main__":
    # Load example data
    peptides = pd.DataFrame({"peptide": ["RVSPSKEVV", "VSPSKEVVR", "SPSKEVVRF"]})
    hla_alleles = pd.DataFrame({"allele": ["HLA-B44:04", "HLA-B44:05"]})
    mhc_type = "mhc1"
    peptides_dir = Path(".")
    peptide_chunk_size = 2
    results_dir = Path(".")
    netmhcpan_cmd = "netMHCpan"

    # Run the workflow
    scheduler = Scheduler(
        config=Config(
            {
                "backend": {
                    "db_uri": "sqlite:///redun.db",
                }
            }
        )
    )
    scheduler.load()
    result = scheduler.run(
        run_netmhcpan(
            peptides,
            hla_alleles,
            mhc_type,
            peptides_dir,
            peptide_chunk_size,
            results_dir,
            netmhcpan_cmd,
        )
    )
    print(result)
mattrasmus commented 1 year ago

Hi @PhilPalmer thanks for posting your question. It looks like you understand the issue, but I'll explain the error as well just to confirm it. Looking at the error, it's happening in this task:

def run_netmhcpan(
    peptides: pd.DataFrame,
    hla_alleles: pd.DataFrame,
    mhc_type: str,
    peptides_dir: Path,
    peptide_chunk_size: int = 500,
    results_dir: Path = Path("."),
    netmhcpan_cmd: str = "netMHCpan",
):
    """
    Run NetMHCpan or NetMHCIIpan for a given set of peptides and HLA alleles, splitting the input peptides
    into chunks and running netmhcpan for each chunk before concatenating the results into the per allele files.
    """
    # Split peptides into chunks
    peptide_paths = chunk_peptides(peptides, peptides_dir, peptide_chunk_size)

    # NOTE: Since `chunk_peptides()` is a task, `peptide_paths` will be a lazy expression of a list.
    # Although in redun we can do a lot with lazy expressions, using them in a for-loop is one we can't do because 
    # because for-loops need to run eagerly, before we know the full length of the list (it will be computed some time later).

    # For each allele, run netmhcpan for each peptide chunk
    peptide_allele_paths = [
        predict_affinity_netmhcpan(
            allele,
            peptide_path,
            mhc_type,
            results_dir,
            netmhcpan_cmd,
        )
        for allele in hla_alleles["allele"].values

        # NOTE: This is the for-loop that's failing.
        for peptide_path in peptide_paths
    ]

    # NOTE: Concatenate the results for each allele using redun.functools.flatten(list_of_lists)
    return flatten(peptide_allele_paths)

So there are a couple of solutions you can do. If you know what the length of a lazy expression of a list will be, you can inform the redun scheduler by using slicing:

peptide_paths = chunk_peptides(peptides, peptides_dir, peptide_chunk_size)
# peptide_paths is an expression of a list of unknown length.

peptides_paths = peptide_paths[:length]
# This is now a regular list of expressions (that represent each item).

# This for-loop (or list comprehension) will now work
for file in peptide_paths:
    # file is an expression, but that should be ok for you.
    # ...

Given your chunking logic, it looks like you would need something like length = math.ceil(len(peptides) / peptide_chunk_size).

Another solution is to force the expression to evaluate to a concrete value by passing it into another task. Parameters of a task always start as concrete (i.e. non-lazy) values.

def run_netmhcpan(
    peptides: pd.DataFrame,
    hla_alleles: pd.DataFrame,
    mhc_type: str,
    peptides_dir: Path,
    peptide_chunk_size: int = 500,
    results_dir: Path = Path("."),
    netmhcpan_cmd: str = "netMHCpan",
):
    """
    Run NetMHCpan or NetMHCIIpan for a given set of peptides and HLA alleles, splitting the input peptides
    into chunks and running netmhcpan for each chunk before concatenating the results into the per allele files.
    """
    # Split peptides into chunks
    peptide_paths = chunk_peptides(peptides, peptides_dir, peptide_chunk_size)
    return run_netmhcpan_chunks(
        peptide_paths, 
        hla_alleles, 
        mhc_type,
        results_dir,
        netmhcpan_cmd
    )

def run_netmhcpan_chunks(
    peptide_paths, 
    hla_alleles: pd.DataFrame,
    mhc_type: str,
    results_dir: Path = Path("."),
    netmhcpan_cmd: str = "netMHCpan",
):
    # NOTE: peptide_paths is concrete now and can be used in a list comprehension.

    # For each allele, run netmhcpan for each peptide chunk
    peptide_allele_paths = [
        predict_affinity_netmhcpan(
            allele,
            peptide_path,
            mhc_type,
            results_dir,
            netmhcpan_cmd,
        )
        for allele in hla_alleles["allele"].values
        for peptide_path in peptide_paths
    ]
    return flatten(peptide_allele_paths)

While this works, you might not like to split tasks so often. You are right that map_ and starmap from redun.functools help with mapping over a lazy expression of a list. Here is how that would look.

def run_netmhcpan(
    peptides: pd.DataFrame,
    hla_alleles: pd.DataFrame,
    mhc_type: str,
    peptides_dir: Path,
    peptide_chunk_size: int = 500,
    results_dir: Path = Path("."),
    netmhcpan_cmd: str = "netMHCpan",
):
    """
    Run NetMHCpan or NetMHCIIpan for a given set of peptides and HLA alleles, splitting the input peptides
    into chunks and running netmhcpan for each chunk before concatenating the results into the per allele files.
    """
    # Split peptides into chunks
    peptide_paths = chunk_peptides(peptides, peptides_dir, peptide_chunk_size)

    peptide_allele_paths = []
    for allele in hla_alleles["allele"].values:
        # Partially apply all the arguments that don't change.
        predict = predict_affinity_netmhcpan.partial(allele=allele, mhc_type=mhc_type, results_dir=results_dir, netmhcpan_cmd=netmhcpan)
        # predict now just needs one more argument, peptide_path.

        # To make the following easy to do, I would rewrite predict_affinity_netmhcpan so that peptide_path is the first parameter.
        # Now we can map_ predict over the lazy list peptide_paths.
        peptide_allele_paths.append(map_(predict, peptide_paths))

    return peptide_allele_paths

Let me know if this helps.

PhilPalmer commented 1 year ago

Thank you for such detailed comments and such a rapid reply, this helps a lot!

One small issue is that using the first approach to inform the scheduler of the length of the lazy expression list I got the same error, i.e.:

peptide_paths = chunk_peptides(peptides, peptides_dir, peptide_chunk_size)
# peptide_paths is an expression of a list of unknown length.

length = math.ceil(len(peptides) / peptide_chunk_size)
peptides_paths = peptide_paths[:length]
# Here I tried to inform the scheduler what the length of the list is
# so that its now a regular list of expressions (that represent each item).

# However, this for-loop still gives the same:
# TypeError: Expressions of unknown length cannot be iterated.
for file in peptide_paths:
    # ...

The third solution using partial and map_ did indeed work. The reason why I ask about the first implementation is because now that I have performed the scatter operation I want to merge the results. I.e. I have the following lazy expression list:

[
    ['HLA-B44:04', File(path=./HLA-B4404_preds_p0.xls, hash=6fd3169a)],
    ['HLA-B44:04', File(path=./HLA-B4404_preds_p1.xls, hash=aa43d706)],
    ['HLA-B44:05', File(path=./HLA-B4405_preds_p0.xls, hash=a9cb9087)],
    ['HLA-B44:05', File(path=./HLA-B4405_preds_p1.xls, hash=1db7114a)]
]

That I want to group the by allele to produce the following output. (So that I can concatenate the results for each allele into a single file in a new task):

[
    ['HLA-B44:04', [File(path=./HLA-B4404_preds_p0.xls, hash=6fd3169a), File(path=./HLA-B4404_preds_p1.xls, hash=aa43d706)]], 
    ['HLA-B44:05', [File(path=./HLA-B4405_preds_p0.xls, hash=a9cb9087), File(path=./HLA-B4405_preds_p1.xls, hash=1db7114a)]]
]

I have tried writing functions to do this, e.g:

def group_by_allele(peptide_allele_paths: List[File]):
    """
    Group the peptide allele paths by allele.
    """
    from itertools import groupby

    def get_allele(item):
        return item[0]

    def get_file(item):
        return item[1]

    grouped = groupby(peptide_allele_paths, key=get_allele)
    return map_(lambda g: [g[0], list(map_(get_file, g[1]))], grouped)

This works on non-lazy lists using map, however, I'm struggling to get this to work on lazy lists because other libraries such as itertools perform iteration.

Do you have any ideas of the best way to resolve this? For example, do you know why I wasn't able to inform the scheduler of the length of the lazy list as this would be very useful to solve this new problem

Thanks again for all of your help!

mattrasmus commented 1 year ago

For the first approach, I think you just have a typo. You have an extra s on the left-hand side so the old lazy list is still being used in the for-loop:

peptideS_paths = peptide_paths[:length]

As for concatenating per allele, I would just make that a follow up task:

# VVV make sure this is a task, not just a plain function.
@task
def group_by_allele(peptide_allele_paths: List[Tuple[str, File]]) -> List[Tuple[str, List[File]]]:
    """
    Group the peptide allele paths by allele.
    """
    allele2files = defaultdict(list)
    for allele, file in peptide_allele_paths:
        allele2files[allele].append(file)
    return list(allele2files.items())

def run_netmhcpan(
    peptides: pd.DataFrame,
    hla_alleles: pd.DataFrame,
    mhc_type: str,
    peptides_dir: Path,
    peptide_chunk_size: int = 500,
    results_dir: Path = Path("."),
    netmhcpan_cmd: str = "netMHCpan",
):
    # ...
    return group_by_allele(peptide_allele_paths)
PhilPalmer commented 1 year ago

For the first approach, I think you just have a typo. You have an extra s on the left-hand side so the old lazy list is still being used in the for-loop

Ah, apologies, thanks for spotting this

As for concatenating per allele, I would just make that a follow up task

Awesome, this worked great, thanks again for all of your help!