redis / redis-om-python

Object mapping, and more, for Redis and Python
MIT License
1.13k stars 115 forks source link

[Enhancement] Retrieve multiple records at once with pipeline #523

Open XChikuX opened 1 year ago

XChikuX commented 1 year ago

Currently there is implementation details for saving | deleting multiple records at once using redis pipelines. Example:

@py_test_mark_asyncio
async def test_delete_many(m):
    member1 = m.Member(
        id=0,
        first_name="Andrew",
        last_name="Brookins",
        email="a@example.com",
        join_date=today,
        age=38,
        bio="This is the user bio.",
    )
    member2 = m.Member(
        id=1,
        first_name="Kim",
        last_name="Brookins",
        email="k@example.com",
        join_date=today,
        age=34,
        bio="This is the bio for Kim.",
    )
    members = [member1, member2]
    result = await m.Member.add(members)
    assert result == [member1, member2]
    result = await m.Member.delete_many(members)
    assert result == 2
    with pytest.raises(NotFoundError):
        await m.Member.get(pk=member1.key())

I want to be able to get multiple records at once, so as to minimize network overhead. Please make it possible

eg. query_members=[pk1, pk2]

result = await m.Member.get_many(query_members)

So something like the delete_many function:

    @classmethod
    async def get_many(
        cls,
        models: Sequence["RedisModel"],
        pipeline: Optional[redis.client.Pipeline] = None,
    ) -> int:
        db = cls._get_db(pipeline)

        for chunk in ichunked(models, 100):
            pks = [model.key() for model in chunk]
            await cls._get(db, *pks)

        return len(models)
slorello89 commented 6 months ago

This is a pretty reasonable ask, my relatively naive thought (from my familiarity of other Redis clients) is that it would just pipeline any task not awaited. I have to admit though I'm not overly familiar with asyncio, but looking at the redis-py code underpinning all of this, it does not look like the physical write-command/read-response portion is re-entrant, so I'm not sure this is possible without some pretty significant refactoring.

As a work-around I think you should be able to leverage connection pooling and the raw database connection to increase concurrency.

@bsbodden or @tylerhutcherson - any thoughts by chance?

XChikuX commented 6 months ago

@slorello89 As far as my understanding goes; Asynchronous code does not await on a functional basis. You can functionalise a series of synchronous steps that can be awaited on, but that doesn't mean multiple awaits in a function will pipeline.

It is more of a line-by-line interpretted basis for python. It isn't possible to call an async task without awaiting it. Therefore, it will not auto pipeline. It must be explicitly supported by the database, by sending in all commands so database can execute (async or sync).

From the redis docs it is clear that the biggest bottleneck is socket I/O. Async programming does not help with this.

There's a good probability it makes it worse.

In python, each awaited call executes a network request to redis and awaits the result. During this waiting it switches context to other tasks and comes back to the result of this network request, which is an interrupt indicating that the program can now focus back onto this task.

slorello89 commented 6 months ago

Hi @XChikuX I'm pretty sure I'm right about this (as I said I would have thought there would be auto-pipelining, but that does not appear to be the case) rather, you can use a connection pool and scale out your reads/writes that way. Co-routines and tasks do not need to be awaited immediately (which is pretty normal in async programming), you can gather them together and await them in bulk.

you can use a conneciton pool to mitigate at least some of this in the interim (I agree this isn't ideal):

from aredis_om import (
    JsonModel,
    Migrator,
    Field
)

import time
import asyncio

from redis.asyncio.client import Redis, ConnectionPool
pool = ConnectionPool(host='localhost', port=6379, db=0)

class TestModel(JsonModel):
    name: str = Field(index=True)

    class Meta:
        database = Redis(connection_pool=pool)

async def await_each_save(num: int):
    for i in range(0,1000):
        tm_coroutine = TestModel(name="steve").save()
        await tm_coroutine

async def await_gather_save(num: int):
    coroutines = []
    for i in range(0, num):
        tm_coroutine = TestModel(name="steve").save()
        coroutines.append(tm_coroutine)
        if len(coroutines) == 50:
            await asyncio.gather(*coroutines)
            coroutines.clear()

async def await_each_search(num: int):
    for i in range(0, num):
        await TestModel.find(TestModel.name == 'steve').first()

async def await_gather_search(num: int):
    coroutines = []
    for i in range(0, num):
        coroutines.append(TestModel.find(TestModel.name == 'steve').first())
        if len(coroutines) == 50:
            await asyncio.gather(*coroutines)
            coroutines.clear()

async def start():
    await Migrator().run()

    # await each save
    start_time = time.time()
    await await_each_save(1000)
    end_time = time.time()
    execution_time = end_time - start_time
    print("Execution time:", execution_time, "seconds")

    # await gather save
    start_time = time.time()
    await await_gather_save(1000)
    end_time = time.time()
    execution_time = end_time - start_time
    print("Execution time:", execution_time, "seconds")

    # await each search
    start_time = time.time()
    await await_each_search(1000)
    end_time = time.time()
    execution_time = end_time - start_time
    print("Execution time:", execution_time, "seconds")

    # await gather search
    start_time = time.time()
    await await_gather_search(1000)
    end_time = time.time()
    execution_time = end_time - start_time
    print("Execution time:", execution_time, "seconds")

benchmarking the above code results in significantly better performance when gathering the coroutines together and awaiting them en-masse (which makes sense because you're scaling your writes out over a number of connection.

Should definitely look into some way of properly pipelining , but I fear it might be quite difficult considering the underpinning client's architecture.

XChikuX commented 6 months ago

I'll keep this in mind for now. Thank you.