oss-aspen / 8Knot

Dash app in development to serve open source community visualizations using GitHub data from Augur. Hosted app: https://eightknot.osci.io
MIT License
47 stars 59 forks source link

New feature: wrangle query sizes #457

Closed JamesKunstle closed 10 months ago

JamesKunstle commented 12 months ago

Here's how database queries work right now, as pseudocode:

def start_queries(repo_list, query_list):
    for query in query_list:
        run_query.delay(repo_list, query)

@celery_task()
def run_query(repo_list, query):

    // runs query on Augur db for each repo in repo_list at once
    data = postgres(query=query, "where repo_id in {repo_list}")

    for repo in repo_list:

        // cache the data per-repo in Redis, keyed by repo_name+query_name hash
        redis.cache(hash(repo.name + query.name), data[repo_id == repo])

In plain english, len(query_list) tasks are scheduled to run, each getting data for all considered repos at once. If the data for all repos is too large for openshift to handle, the celery worker handling this task is killed.

We see the effects of this architecture in these issues:

440

450

429


We could arbitrarily break queries into smaller subsets of repos, but that wouldn't necessarily fix the problem if we hard-coded something like "collect the data for 20 repos at a time" because the query itself could be very large, and 20 repos would have a lot of data anyway.

The ideal solution is to know how much data will be returned by a query, per repo, and break up the query greedily into something like 256MB chunks of repos, and if a repo itself accounted for more than 256MB, to paginate that query into smaller chunks.


The pseudocode for possible perfect solution would look like:

def start_queries(repo_list, query_list, size_requirement):
    for query in query_list:
        sublists_that_fit = find_sublists(repo_list, query, size_requirement)
        for sublist in sublists_that_fit:
            run_query.delay(sublist, query, paginate=(len(sublist)==1))

def find_sublists(repo_list, query):
    // ask Postgres for the size of the result per repo in repolist,
    // greedily package repos together if the sum of their result size 
    // is below some configured threshold.
    // IF single repo result size > threshold, make it its own sublist.

    return sublists

@celery_task()
def run_query(repo_list, query, paginate=False):

    if paginate:
         // query and store data in pages
         return 

    // runs query on Augur db for each repo in repo_list at once
    data = postgres(query=query, "where repo_id in {repo_list}")

    for repo in repo_list:

        // cache the data per-repo in Redis, keyed by repo_name+query_name hash
        redis.cache(hash(repo.name + query.name), data[repo_id == repo])

Unsure if the perfect solution would be tenable, would also love to find a better solution than current "perfect" solution.

JamesKunstle commented 12 months ago

@codekow Would love your input on this, could also chat about it in our upcoming meeting.

codekow commented 11 months ago

@JamesKunstle How many repos do you normally expect iin a repo_list?

Even if the queries are small for a single repo I would consider creating a single celery task for each repo. Those jobs would be distributed to workers. Unless there is a known issue of overhead for celery jobs, I would start with that.

Then each repo query function would check the total number of records available and have a predefined limit to cause pagination (ex: 100k). Then the parent repo query could create celery sub tasks for each set of paginated records.

Does that make sense?

JamesKunstle commented 11 months ago

@codekow Yeah that makes sense. We could pack multiple repos into workers scheduled with 128MB or 64MB of ram though. Maybe that's unnecessary optimization in a relatively resource-abundant setting, though.

JamesKunstle commented 11 months ago

Will fix #429, closing that issue as duplicate

JamesKunstle commented 11 months ago

Here's an example of how to use streaming to query a result from database and chunk the result so that the memory usage of the querying worker is always relatively low. I think this is slower than querying all at once, but it's far more resilient to OOM.

Chunksize is a tunable hyper-parameter that we can mess with based on the memory we want to schedule a worker with.

The chunk_dataframe is treated as an iterable, so the final result could be fewer than chunksize rows.

connection_string = "..."

engine = salc.create_engine(
    connection_string
)
conn = engine.connect().execution_options(
    stream_results=True)

for i, chunk_dataframe in enumerate(pd.read_sql(query="...", conn, chunksize=10000)):
    cache.write_rows(table=table_name, source=chunk_dataframe)
codekow commented 11 months ago

If you can get the total size of the query (number of rows) then you can split the whole query into chunks and schedule those chunked queries on the worker pods. Horizontal auto scaling should help do the rest for big queries.

Then you scale to N :rocket:

JamesKunstle commented 10 months ago

Solution: query pagination from Augur, storing data in a Postgres cache.