kaskada-ai / kaskada

Modern, open-source event-processing
https://kaskada.io/
Apache License 2.0
348 stars 15 forks source link

Deadlock in `execution` #847

Open jordanrfrazier opened 10 months ago

jordanrfrazier commented 10 months ago

There exists a deadlock when running the following:

import kaskada as kd
import asyncio

kd.init_session()

data1 = "\n".join(
    [
        "time,key,m,n",
        "1996-12-19T16:39:57,A,5,10",
        "1996-12-19T16:39:58,B,24,3",
        "1996-12-19T16:39:59,A,17,6",
        "1996-12-19T16:40:00,A,,9",
        "1996-12-19T16:40:01,A,12,",
        "1996-12-19T16:40:02,A,,",
    ]
)

data2 = "\n".join(
    [
        "time,key,m,n",
        "1996-12-20T16:39:57,A,5,10",
        "1996-12-20T16:39:58,B,24,3",
        "1996-12-20T16:39:59,A,17,6",
        "1996-12-20T16:40:00,C,,9",
        "1996-12-20T16:40:01,A,12,",
        "1996-12-20T16:40:02,A,,",
    ]
)

------

source = await kd.sources.CsvString.create(data1, time_column="time", key_column="key")

execution = source.run_iter(mode="live")

async def add_more_data():
    print("waiting to send more data")
    await asyncio.sleep(0.5)
    await source.add_string(data2)
    print("sent more data")

async def stop_execution():
    print("waiting to stop execution")
    await asyncio.sleep(1.5)
    print("stopping execution", flush=True)
    await asyncio.sleep(0.001)
    execution.stop()
    print("stopped execution", flush=True)

async def output_batches():
    while True:
        try:
            print("waiting for next batch")
            async with asyncio.timeout(2):
                next_batch = await execution.__anext__()
                print(next_batch)
        except StopAsyncIteration:
            print("stop async iteration")
            break
        except StopIteration:
            print("stop iteration")
            break
        except TimeoutError:
            print("timeout")
            break
        except Exception as exp:
            print(f"other exception: {exp}")

stop_task = asyncio.create_task(stop_execution())
add_task = asyncio.create_task(add_more_data())
output_task = asyncio.create_task(output_batches())

# wait for the tasks to finish
await stop_task
print("stop task complete")
await add_task
print("add task complete")
await output_task
print("output task complete")
jordanrfrazier commented 10 months ago

A few issues: