CouncilDataProject / cdp-backend

Data storage utilities and processing pipelines used by CDP instances.
https://councildataproject.org/cdp-backend
Mozilla Public License 2.0
22 stars 26 forks source link

Reduce complexity of event gather pipeline #230

Closed evamaxfield closed 1 year ago

evamaxfield commented 1 year ago

cc @whargrove -- I know that the Montana legislature event gather pipeline has been failing for a bit, most commonly iirc from disk fill up. I have been trying to think of ways to reduce those failures from happening.

The original reason we used Prefect for our pipeline management was because we used to be able to process events in parallel (when we were using google cloud speech-to-text we could parallel process because they were doing the heavy lifting, now that we are using whisper and we only have a single GPU, it is easiest to do everything synchronously). The problem is, prefect naturally assumes things can be run in parallel (and partially that we have infinite resources) so it runs tasks in the a semi-breadth first manner. This is what is causing a lot of your out of memory issues -- prefect will download all of the videos in your batch, then move on to stripping the audio, then move on to transcribing, etc, prior to uploading / deleting the file from disk.

This sort of makes prefect useless.

I am opening this issue to hopefully have a discussion on reducing the complexity of the entire library / the main event processing pipeline by reducing our usage of prefect, or entirely removing it from the library.


Options for reducing our usage of prefect

Remake the pipeline to work on a single session

Right now, the pipeline takes in any number of events and creates session processing tasks for each session within each event. Each of these can run in parallel. If I were describe this potential change in pseudo code it would be from currently having this:

with Flow("event-gather-pipeline") as flow:
    events = get_events_func()
    for event in events:
       for session in sessions:
           task1
           task2
           task3

to the following:

events = get_events_func()
for event in events:
    for session in sessions:
        with Flow("processing-single-session") as flow:
            task1
            task2
            task3

# some kind of nice utility to aggregate errors
log(agg_errors)  # "3 / 8 sessions failed to process" or "all sessions processed successfully", etc.

This would naturally make it so one session is processed at a time (and all files should hopefully be deleted after each session is done) and we could actually re-enable some minor parallelism (things like thumbnail generation during audio processing).

Remove prefect entirely

This would remove the complexity A LOT, but it would also remove all the niceties like the free logging, the easy error management / recovery, and most especially, the retry and backoff system.

But it would entirely remove prefect and make our whole pipeline synchronous which is much less complex for sure.

My opinion

I personally side with option 1 ("remake the pipeline to work on a single session") -- we reduce the complexity, don't have to change to much, gain some benefit, and don't lose too much. Additionally, it keeps the door open for us to upgrade to the latest version of prefect which I am sure has nicer features that will continue to help us in the future.

cc @Shak2000

whargrove commented 1 year ago

Thanks for looking into this @evamaxfield ! Your recommendation intuitively makes sense to me.

Am I right in understanding that with your proposed change there would be a list of Flows created by prefect that would be run sequentially? Or would cdp-backend create sub-flows?

evamaxfield commented 1 year ago

Am I right in understanding that with your proposed change there would be a list of Flows created by prefect that would be run sequentially? Or would cdp-backend create sub-flows?

Good question. I would create a list of flows that are then iteratively executed in the current bin script.

events = get_events_func()

session_processing_flows = []
for event in events:
    for session in sessions:
        with Flow("processing-single-session") as flow:
            task1
            task2
            task3

        session_processing_flows.append(flow)

return session_processing_flows

Then iteratively completely execute each of those flows in the bin script -- no partial run, no parallel run, etc.

evamaxfield commented 1 year ago

https://github.com/CouncilDataProject/cdp-backend/pull/232

evamaxfield commented 1 year ago

Done. Released in v4.1.0.rc0

Already deployed and tested on Seattle: https://github.com/CouncilDataProject/seattle/actions/runs/4902430069/jobs/8754315894