mobiusml / aana_sdk

Aana SDK is a powerful framework for building AI enabled multimodal applications.
https://www.mobiuslabs.com/
Apache License 2.0
26 stars 3 forks source link

Hr/streaming #87

Closed HRashidi closed 2 months ago

HRashidi commented 5 months ago

Adding streaming support for the aana

The stream can support fps and channel number I added fps and channel inside the stream input, but we can do the same with different input, something like video_params

HRashidi commented 5 months ago

Support: hls, dash, mp4

Supporting SRT: apt-get install libavformat-dev libavdevice-dev pip install av --no-binary av (version 12)

We can not install the pyav with poetry with this flag: --no-binary

movchan74 commented 5 months ago

I've tried the example project for streaming on news stream but I think it lags behind but I don't know. @HRashidi how can I figure it out? Maybe return lag in the output? I'm not sure. @evanderiel @ashwinnair14 any ideas how we should deal with lag?

movchan74 commented 5 months ago

I've tried the example project for streaming on news stream but I think it lags behind but I don't know. @HRashidi how can I figure it out? Maybe return lag in the output? I'm not sure. @evanderiel @ashwinnair14 any ideas how we should deal with lag?

I think I found the way to calculate lag with existing output:

import requests, json, time

data = {
    "stream": {
        # "url": "https://www.youtube.com/watch?v=9bZkp7q19f0",
        "url": "https://tagesschau.akamaized.net/hls/live/2020117/tagesschau/tagesschau_3/master-720p-3200.m3u8",
        "media_id": "9bZkp7q19f0",
    }
}
response = requests.post(
    "http://localhost:8000/stream/caption_stream",
    data={"body": json.dumps(data)},
    stream=True,
)
init_timestamp = None
init_time = None
for chunk in response.iter_content(chunk_size=None):
    chunk = json.loads(chunk)

    if not init_timestamp:
        init_timestamp = chunk["timestamps"][0]/100000
        init_time = time.time()

    # calculate lag
    current_time = time.time()
    time_diff = current_time - init_time
    timestamp_diff = chunk["timestamps"][0]/100000 - init_timestamp
    lag = time_diff - timestamp_diff
    print(f"Time diff: {time_diff:.2f}s")
    print(f"Timestamp diff: {timestamp_diff:.2f}s")
    print(f"Lag: {lag:.2f}s")

    print (chunk)

And it does lags behind:

Time diff: 0.00s
Timestamp diff: 0.00s
Lag: 0.00s
{'captions': ['a man walking past a store window with a sign on it that says "no smoking" in french and english.jpg, file photo, montreal, canada', 'a black and white photo of a woman walking past a store window with a sign on it that says "mima" in germany, and a woman walking past a store window with a sign on it that says "mima" in germany'], 'timestamps': [2673903000, 2673931800]}
Time diff: 1.86s
Timestamp diff: 0.58s
Lag: 1.28s
{'captions': ['a woman is walking past a store window with a sign that says "open" on it. the woman is wearing a white shirt and black pants. she is holding a purse', 'a man is walking past a store window with a sign that says "villa" on it in french and english. the man is wearing a white shirt and black pants'], 'timestamps': [2673960600, 2673989400]}
Time diff: 3.60s
Timestamp diff: 1.15s
Lag: 2.45s
{'captions': ['a man is walking past a store front with a sign that says stufung händler in germany. the man is wearing a suit and tie', 'a man is standing outside of a store with a sign that says stifung water in germany. the sign is in english and says stifung water'], 'timestamps': [2674018200, 2674047000]}
Time diff: 4.97s
Timestamp diff: 1.73s
Lag: 3.24s
{'captions': ['a man walking past a store with the words "stufung waren" on the front window in germany\'s capital city of berlin, germany', 'a black and white photo of a store front with a sign that says stiftung warennte in germany. the sign is in english and says stiftung warennte'], 'timestamps': [2674075800, 2674104600]}
Time diff: 6.84s
Timestamp diff: 2.30s
Lag: 4.53s

I even tried to change extract_fps to 1 but it still lags.

evanderiel commented 5 months ago

I've tried the example project for streaming on news stream but I think it lags behind but I don't know. @HRashidi how can I figure it out? Maybe return lag in the output? I'm not sure. @evanderiel @ashwinnair14 any ideas how we should deal with lag?

If I understand correctly, the problem is that frames come in faster than the pipeline can process them, correct? Then we need to either make the pipeline on average faster (e.g. larger batches) or process fewer frames. Processing fewer frames can mean janky results but it may not be possible to make the pipeline sufficiently fast in any case.

movchan74 commented 5 months ago

I've tried the example project for streaming on news stream but I think it lags behind but I don't know. @HRashidi how can I figure it out? Maybe return lag in the output? I'm not sure. @evanderiel @ashwinnair14 any ideas how we should deal with lag?

If I understand correctly, the problem is that frames come in faster than the pipeline can process them, correct? Then we need to either make the pipeline on average faster (e.g. larger batches) or process fewer frames. Processing fewer frames can mean janky results but it may not be possible to make the pipeline sufficiently fast in any case.

Generally, yes. A faster pipeline means less lag. But that's not the only way to reduce lag. Right now, we are processing one batch of images after another sequentially. The more efficient way would be to process multiple batches at once, like we did it with Mobius Pipeline. I actually tried an older commit (bd49a274d5e901de95720c9108a2975511778368) that still runs on Mobius Pipeline and it's much faster, there is no lag (it's actually negative because of the way that I calculate it), even at 3 fps.

movchan74 commented 5 months ago

Update on the lag issues

Here is a function that runs multiple concurrent tasks on the generator yields:


async def run_async_generator_concurrently(async_generator, process, batch_size=2):
    queue = asyncio.Queue(batch_size * 2)
    result_queue = asyncio.Queue()
    num_consumers = batch_size

    async def producer():
        async for i in async_generator:
            await queue.put(i)
        # Signal all consumers to shut down by putting a sentinel for each consumer in the queue
        for _ in range(num_consumers):
            await queue.put(None)

    async def consumer():
        while True:
            item = await queue.get()
            if item is None:  # Check for the sentinel
                queue.task_done()
                break
            result = await process(item)
            await result_queue.put(result)
            queue.task_done()

    consumers = [consumer() for _ in range(num_consumers)]
    # # Setup the producer and consumers to run concurrently
    # await asyncio.gather(producer(), *consumers)

    producer_task = asyncio.create_task(producer())
    consumer_tasks = [asyncio.create_task(c) for c in consumers]

    # Yield all results as they are processed
    while True:
        if (
            result_queue.empty()
            and all(c.done() for c in consumer_tasks)
            and producer_task.done()
        ):
            break
        result = await result_queue.get()
        yield result
        result_queue.task_done()

    # Wait for all tasks to complete
    await producer_task
    for task in consumer_tasks:
        await task

You can try it on the toy example.

import asyncio

async def async_generator(n):
    for i in range(n):
        yield i

async def process(i):
    await asyncio.sleep(0.1)
    print(f"i * i = {i * i}")
    return i * i

gen = async_generator(10)
async for item in run_async_generator_concurrently(gen, process, 5):
    print(item)

With 5 consumers it runs 5x faster. So it seems to work.

But when I add it to the endpoint it doesn't seems to help much.

class CaptionStreamEndpoint(Endpoint):
    """Transcribe video in chunks endpoint."""

    async def initialize(self):
        """Initialize the endpoint."""
        self.captioning_handle = await AanaDeploymentHandle.create(
            "captioning_deployment"
        )

    async def run(
        self,
        stream: StreamInput,
        batch_size: int,
    ) -> AsyncGenerator[CaptionStreamOutput, None]:
        """Transcribe video in chunks."""
        async def predict_captions(frames_dict):
            captioning_output = await self.captioning_handle.generate_batch(
                images=frames_dict["frames"]
            )
            # print("captioning_output", captioning_output)
            return {
                "captions": captioning_output["captions"],
                "timestamps": frames_dict["timestamps"],
            }

        gen = run_remote(fetch_stream_frames)(stream_input=stream, batch_size=2)
        async for item in run_async_generator_concurrently(
            gen, predict_captions, batch_size
        ):
            yield item

The version with Mobius Pipeline still has a lower lag.

But I run it for longer and the lag always goes up. I've tried to reduce FPS, use a version with Mobius Pipeline, and add more consumers. Nothing works, it starts to increase after some time.

grafik

I've tried to debug it by looking at the BLIP2 latency in metrics but it's not clear to me what it means.

BTW Ray added a command to start Prometheus locally but it's only available in the latest versions so we need to update Ray. See https://docs.ray.io/en/latest/cluster/metrics.html#quickstart-running-prometheus-locally.

What it all means I'm not sure yet. There is definitely something wrong and we need to figure it out before we can say that we have streaming support.

ashwinnair14 commented 5 months ago

Temporarily paused.

movchan74 commented 5 months ago

I've tested the changes and it seems to work better, at least at 1 FPS. At 1 FPS it seems to be pretty stable now. image

But it still lags at >=2 FPS. Even with the async queue. Actually, async queue seems to be hurting the performance. image

I think the main issue now is that the model is too slow. But that's not an issue with PR so it's fine.