robinhood / faust

Python Stream Processing
Other
6.72k stars 535 forks source link

how to pause agents when another one is running #695

Open informatica92 opened 3 years ago

informatica92 commented 3 years ago

I have a main agent that runs and collects messages from a kafka topic and a timer that sends one message per hour on another topic. This message triggers a second agent that scans the rocksdb table and removes some elements in the list according to a condition. Now, I noticed that the consumer, designed as I explained before, usually raises an error saying: [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding... and this error stopped to be raied when I disabled the second agent (now the elements in the roksdb table are not removed anymore). For this reason I am now looking for a way to re integrate the second agent correctly, for example I was thinking to put the first agent in pause when the second one is running (or in any other way you suggest).

Is there a way to do this? Thanks in advance

bobh66 commented 3 years ago

It sounds as though the code that is scanning the table is not releasing control back to the async loop which is causing the keepalive to be delayed. Try inserting an asyncio.sleep(0) every once in a while (every 100? rows - the exact number is specific to your code) so that the event loop can process the other things it needs to do, like keepalives.

If you really want to pause the consumer while the table scan is running (assuming they are both in the same process), you can add a semaphore/lock that the scanning process sets when it starts the scan, and the consumer can check the semaphone every time it receives a message so it won't run while the table scan is in progress.

You still need to have the table scan code release control back to the event loop via the asyncio.sleep so the keepalive and other tasks don't get pre-empted.

Hope this helps

informatica92 commented 3 years ago

Hi @bobh66 and thanks for your answer. I'd first try with the asyncio.sleep(0) only, then I'll also see if I have to add the semaphore/lock system. Could you please help me to understand where this line should be added? This is a semplified version of my code just to make it easier to understand where the sleep goes

# ... import

# ... init
app = faust.App(...)

# ... topic 
real_time_schedule_topic = app.topic(...)
delete_real_time_schedule_topic = app.topic(...)

# ... table
schedule_table = app.Table('schedule-v3', default=lambda: {}, partitions=1)

# ... agents
@app.agent(delete_real_time_schedule_topic, concurrency=1)
async def delete_real_time_schedule_function(messages: faust.Stream):
    async for _ in messages.events():
        ids_to_remove = []
        for k, v in schedule_table.items():
            if [condition]:
                ids_to_remove.append(k)

        for k in ids_to_remove:
            schedule_table.pop(k)

        del ids_to_remove

@app.agent(real_time_schedule_topic, concurrency=4)
async def simulate_bet_function(messages: faust.Stream):
    async for message in messages.events():
        values: Schedule = message.value
        # ... collect 

@app.timer(3600)
async def produce_delete_messages():
    await delete_real_time_schedule_function.send(value=datetime.now().isoformat())

if __name__ == '__main__':
    app.main()

thanks in advance

bobh66 commented 3 years ago

Depending on the size of schedule_table and ids_to_remove, you may need to keep track of how many items you have processed in each for loop and sleep() after some number of items:

count = 0
SLEEP_LIMIT = 100  # Or some other number that makes sense
for k, v in foo.items():
    count += 1
    if count >= SLEEP_LIMIT:
        count = 0
        await asyncio.sleep(0.1)
    # Do the synchronous work

Basically you need to make sure that your synchronous loops (for or while loops that don't await anything) release control back to the event loop every once in a while so you don't starve the other threads. The SLEEP_LIMIT or number of items you can process before the sleep will depend on a lot of things but you can use trial and error to come up with a value that works for your application.

Hope this helps

informatica92 commented 3 years ago

I am still facing the same problem.

I am now trying to set the entities I want to delete to None instead of deleting them. I'll let you know if this solves the problem.

informatica92 commented 3 years ago

Probably I foud a solution. Just to give you the complete overview of the solutions I tried, the issue continued to be present even with a semaphore and multiple asyncio.sleep(0). Eventually I tried to access to RocksDB directly with the pyrocksdb library (instead of using the Faust tables) and the problem seems to be SOLVED. It has been working for 4 days without no restart/errors.

The code changed a bit:

ids_to_remove = 0 counter = 0 batch = rocksdb.WriteBatch() # start object for batch delete for k, v in it: v = json.loads(v) k = k.decode() counter += 1 if : ids_to_remove += 1 batch.delete(k.encode()) # add element in the batch

app.logger.warning(f"DELETION: {ids_to_remove} out of {counter} rows will be deleted") schedule_table.write(batch) # execute batch

await asyncio.sleep(0) # sleep