vipyrsec / dragonfly-mainframe

The mainframe for Dragonfly
https://docs.vipyrsec.com/dragonfly-mainframe/
MIT License
5 stars 1 forks source link

Cache jobs and results #213

Closed Robin5605 closed 6 months ago

Robin5605 commented 6 months ago

Closes #89

Robin5605 commented 6 months ago

Performance Graphs

image Figure 1

image Figure 2

image Figure 3

image Figure 4

Testing Methodology

Testing was performed by running this script:

import httpx
import asyncio
import traceback
import psycopg2

async def do_job(client: httpx.AsyncClient):
    res = await client.post("/jobs")
    try:
        json = res.json()
        if len(json) == 0:
            print("got 0 jobs, skipping")
            return
        j = json[0]
        name, version = j["name"], j["version"]
        print(f"Got {name} v{version}")
    except:
        traceback.print_exc()
        print(res.text)
        print("-"*20)
        return

    payload = {"name": name, "version": version, "reason": "Package too large"}
    res = await client.put("/package", json=payload)
    print(f"Finished {name} v{version}")
    res.raise_for_status()

async def get_package_info(client: httpx.AsyncClient):
    res = await client.get("/package?name=discord.py")
    res.raise_for_status()

async def main():
    connection = psycopg2.connect(user="postgres", password="postgres", host="localhost", port="5432")
    cursor = connection.cursor()
    cursor.execute("UPDATE scans SET status = 'QUEUED' WHERE scan_id in (SELECT scan_id FROM scans ORDER BY queued_at DESC LIMIT 200)")
    connection.commit()
    cursor.close()
    connection.close()

    client = httpx.AsyncClient(base_url="http://localhost:8000")

    async with asyncio.TaskGroup() as tg:
        for _ in range(200):
            tg.create_task(do_job(client))

asyncio.run(main())

The durations were parsed from the log files.

Insights

All 4 graphs appear to have a linear relation for at least some part of the graph. This is most evident in Figure 1. The most probably reason for this is the following code:

    def get_job(self) -> Optional[Scan]:
        """Get one job. Refills the cache if necessary."""

        with self._refill_lock:
            if self.scan_queue.empty():
                self.refill()

            # If it's still empty after a refill, there aren't any more jobs in the DB.
            if self.scan_queue.empty():
                return None
            else:
                scan = self.scan_queue.get()

        scan.status = Status.PENDING
        scan.pending_at = datetime.now(timezone.utc)
        self.pending.append(scan)

        return scan

The lock causes all requests to the get job endpoint to be executed sequentially. Say, for instance, we have 5 incoming requests at the same time. The first request will immediately obtain a lock, and do it's processing. The second requests must wait this whole time, and once the first request is done, then the second request obtains the lock, does it's own processing. At this point, the 3rd request has been waiting for both the first and the second request. This pattern continues on for each request, each one taking slightly longer than the previous one - which gives it the effect of a linear operation. There is unfortunately nothing we can do about this.

Figure 3 is of particular interest, because it does show that at extreme edge cases (such as the one where 100 requests are incoming at the same time), the cache does have a non-negligible performance increase. However, this is not feasible because if there aren't that many requests per time period, then we end up starving flushes (but perhaps this could be alleviated with timed flushes).

Because of the negligible performance increase this gives us in most cases, and the increased code complexity it adds, I will be closing this as not necessary.