biocommons / biocommons.seqrepo

non-redundant, compressed, journalled, file-based storage for biological sequences
Apache License 2.0
39 stars 35 forks source link

Seqrepo not giving back consistent data #129

Open wlymanambry opened 8 months ago

wlymanambry commented 8 months ago

Describe the bug I have found that SeqRepo intermittently returns incorrect sequences. I've loaded millions of small protein sequences. I started seeing sequences returned that couldn't be accounted for. I eventually decided to write a loop on pulling sequence data from Seqrepo and comparing the returned sequence to the known sequence. (each All species match) is an iteration of checking the same 100 species or 100 loaded protein sequences:

Seq repo makes it 31 iterations, or 3,100 sequence comparisons before randomly returning incorrect data: image

It then churns through about the same amount before again returning an incorrect sequence: image

To Reproduce Steps to reproduce the behavior: Load a few million protein sequences and then query several thousand at a time while doing a check on the known sequence identity.

Expected behavior For seqrepo to return the same sequence always.

Additional context One bizarre aspect of this, I can't identify where the incorrect sequences are coming from. If I grab one of the incorrect returned sequences and check all of my sequence data that has been loaded. I don't see it. Also, I'm getting sequence back that isn't even sequence: image

Also, it looks like this is isolated to having many concurrent calls. (100 in my case) It doesn't reproduce with serial calls.

I am using this version: seqrepo-rest-service:0.2.2

jsstevenson commented 2 months ago

Hey @wlymanambry -- would you be willing to share some code or a little more details on how you're seeing this so that I can reproduce it?

Also, it looks like this is isolated to having many concurrent calls.

Without knowing anything else, this would be my guess as to the issue, fwiw. Has anyone else (@theferrit32 ?) seen something similar?

wlymanambry commented 2 months ago

Sure I'm using aiohttp:

import aiohttp class SRQueryAsync: """ Seqrepo helper class that does async bulk query Format for seqrepo url parameter is: http://10.6.147.83:5000/seqrepo/1/sequence/ """

def __init__(
    self,
    seqrepo_url: str,
    sequence_ids: list,
    start_pos: int = None,
    end_pos: int = None,
):
    self.seqrepo_url = seqrepo_url
    self.sequence_ids = sequence_ids
    self.start_pos = start_pos
    self.end_pos = end_pos

async def query(self):
    seqrepo_data = []
    max_workers = 100
    tcp_connection = aiohttp.TCPConnector(limit=max_workers)
    async with aiohttp.ClientSession(connector=tcp_connection) as session:
        results = await self.create_url(self.sequence_ids, session)
        seqrepo_data.extend(results)

    await tcp_connection.close()
    return seqrepo_data

async def create_url(self, seq_list, session):
    reqs = []

    for seq in seq_list:
        url = (
            self.seqrepo_url
            + seq
            + "?start="
            + str(self.start_pos)
            + "&end="
            + str(self.end_pos)
        )

        req = asyncio.ensure_future(self.get_seq(url, species=seq, session=session))
        reqs.append(req)

    return await asyncio.gather(*reqs, return_exceptions=True)

And then the calling code was:

sequence_ids: list = [] for sequence_abbr in protein_alignment_species_info: sequence_ids.append(f"{sequenceabbr}{transcript_truncated}")

     seq_data = SRQueryAsync(
        seqrepo_url=self.seqrepo_url,
        sequence_ids=sequence_ids,
        start_pos=seq_start_pos,
        end_pos=seq_end_pos,
     )

    seq_results: List(tuple(str, str)) = asyncio.run(seq_data.query())
jsstevenson commented 2 months ago

Thanks! I'll put this on my list of stuff to tinker with during data loads.

wlymanambry commented 2 months ago

Thank you for taking a look!

theferrit32 commented 2 months ago

@wlymanambry can you provide the code used in the method SRQueryAsync.get_seq?

wlymanambry commented 2 months ago

@wlymanambry can you provide the code used in the method SRQueryAsync.get_seq?

async def get_seq(self, url, species, session):
    timeout = 10
    try:
        async with session.get(url, timeout=timeout) as response:
            result = await response.text()
            # Async error handling doesn't fire the same, gets pages of errors from seqrepo on missing sequence, handling manually
            if len(result) > 500:
                return (
                    species.split("_")[0],
                    "INTERNAL SERVER ERROR for url: " + url,
                )
            else:
                return (species.split("_")[0], result)
    except Exception as e:
        return {"error": e}