faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.64k stars 183 forks source link

SetTable crashes when using RocksDB #261

Open elja opened 2 years ago

elja commented 2 years ago

Checklist

Steps to reproduce

If you use "rocksdb" as storage you got this error, it doesn't happen if you use memory.

Basically, I have a list of tasks that I load from the database and store locally (no kafka here), then I have some external events that are triggering execution of these tasks, the only thing I want is to not send a task to a kafka if it's already was sent and currently processing. I use a SetTable to track the progress of the task execution. All topics and this table have the same number of partitions and also use the same key, so they should be co-partitioned.

import random
import asyncio
import faust
import logging

app = faust.App(
    id=f"rocksdb_test",
    broker='localhost:9092',
    store='rocksdb://'
)

event_topic = app.topic('test_events', key_type=str, value_type=str, partitions=8)
task_queue_topic = app.topic('test_task_queue_topic', key_type=str, value_serializer='json', partitions=8)
task_completion_topic = app.topic('test_task_completion_topic', key_type=str, value_serializer='json', partitions=8)
tasks_progress = app.SetTable('test_tasks_progress', partitions=8, value_type=int)

# hardcoded tasks
TASK_DEFINITIONS = {
    "foo": [
        {"id": 1},
        {"id": 2},
        {"id": 3}
    ],
    "bar": [
        {"id": 4},
        {"id": 5},
        {"id": 6},
    ],
    "hello": [{"id": 7}],
    "world": [{"id": 8}],
}

# =================================== SCHEDULER ================================

async def task_queue_sink(job):
    logging.info(f"Sending task: task_id = {job['task_id']} / uuid = {job['uuid']}")
    await task_queue_topic.send(key=job['key'], value=job)
    tasks_progress[job['key']].add(job['task_id'])

@app.agent(event_topic, sink=[task_queue_sink])
async def events_stream(stream: faust.Stream):
    async for key in stream:
        for task in TASK_DEFINITIONS[key]:
            if task['id'] not in tasks_progress[key]:
                new_job = {
                    'uuid': faust.uuid(),
                    'task_id': task['id'],
                    'key': key
                }

                yield new_job

@app.agent(task_completion_topic)
async def task_completions(stream: faust.Stream):
    async for job in stream:
        logging.info(f"Received Task Completion: id = {job['task_id']} / uuid = {job['uuid']}")
        tasks_progress[job['key']].discard(job['task_id'])

# ==================================================================================
# ================================== WORKER ========================================

async def task_completed_sink(job):
    logging.info(f"Sending task completion! task_id = {job['task_id']} / uuid = {job['uuid']}")
    await task_completion_topic.send(key=job["key"], value=job)

@app.agent(task_queue_topic, sink=[task_completed_sink])
async def worker(stream: faust.Stream):
    async for job in stream:
        logging.info(f"Task Received!: task_id = {job['task_id']} / uuid = {job['uuid']}")
        await asyncio.sleep(1)  # DO SOME WORK HERE
        logging.info(f"Task Processed!: task_id = {job['task_id']} / uuid = {job['uuid']}")
        yield job

# ==================================================================================
# ================================= 3rd party events ===============================

@app.timer(interval=1.0)
async def push_events_every_second():
    key = random.choice(list(TASK_DEFINITIONS.keys()))
    logging.info(f"Pushing key = {key}")
    await event_topic.send(key=key, value=key)

# ====================================================================================

if __name__ == '__main__':
    app.main()

Tell us what you did to cause something to happen.

Expected behavior

I should be able to store data to the SetTable that uses RocksDB without any errors.

Actual behavior

AssertionError: assert event is not None ../python3.9/site-packages/faust/stores/rocksdb.py#274

Full traceback


[2022-01-19 14:26:10,389] [33567] [ERROR] [^----ChangeloggedSetManager: None]: Crashed reason=AssertionError() 
Traceback (most recent call last):
  File "******/projects/scheduler/.venv/lib/python3.9/site-packages/mode/services.py", line 802, in _execute_task
    await task
  File "******/projects/scheduler/.venv/lib/python3.9/site-packages/faust/tables/objects.py", line 146, in _periodic_flush
    self.flush_to_storage()
  File "******/projects/scheduler/.venv/lib/python3.9/site-packages/faust/tables/objects.py", line 139, in flush_to_storage
    self.storage[key] = self.data[key].as_stored_value()
  File "******/projects/scheduler/.venv/lib/python3.9/site-packages/faust/stores/base.py", line 208, in __setitem__
    return self._set(self._encode_key(key), self._encode_value(value))
  File "******/projects/scheduler/.venv/lib/python3.9/site-packages/faust/stores/rocksdb.py", line 274, in _set
    assert event is not None
AssertionError
[2022-01-19 14:26:10,398] [33567] [INFO] [^Worker]: Stopping... 
[2022-01-19 14:26:10,399] [33567] [INFO] [^-App]: Stopping... 

Versions

j123b567 commented 2 years ago

Can be reproduced also by example examples/tableofset.py jsut by adding store='rocksdb://'

j123b567 commented 2 years ago

Code that produces assert is similar to all other table operations where there is this code instead

        if event is None:
            raise TypeError("Cannot modify table key from outside of stream iteration")

After examining the code, this is exactly the case. The table is written outside of the stream iteration. There is a separate periodic task that takes the memory representation and tries to flush it out to the RocksDB table. However, this cannot work. I just don't understand how it can work at all any time, because it is broken by design.

elja commented 2 years ago

@j123b567 I highly recommend you to use java spring + kafka streams directly if it's possible. After spending many days on faust I decided to switch and after a couple of days of learning, I did what I wanted. I thought that fixing issues that you face with faust will require you to contribute to this repo A LOT. Faust is a great tool and this is really sad that nobody maintains it, and on the other hand, only people who know kafka steams well can do it.

j123b567 commented 2 years ago

@elja my app is now functional and using faust-streaming :D but this proposal will be probably my next step.

wbarnha commented 2 years ago

I took a look at your example and this part caught my attention:

async def task_queue_sink(job):
    logging.info(f"Sending task: task_id = {job['task_id']} / uuid = {job['uuid']}")
    await task_queue_topic.send(key=job['key'], value=job)
    tasks_progress[job['key']].add(job['task_id'])

Using this as an agent sink to modify a table is entering murky waters. Taken from https://faust.readthedocs.io/en/latest/userguide/agents.html#concurrency:

Warning:

Concurrent instances of an agent will process the stream out-of-order, so you cannot mutate tables from within the agent function:

An agent having concurrency > 1, can only read from a table, never write.

I haven't tried writing to a Table using your methodology, so i'm afraid I can't be too helpful.

memory storage works because it's a base.Store and is relatively straightforward. The rocksdb driver is derived from the base.SerializedStore class, which is a separate beast altogether. The functionality required for the rocksdb driver to satisfy your use-case will require some changes made to faust.