openzim / kolibri

Convert a Kolibri channel in ZIM file(s)
GNU General Public License v3.0
8 stars 12 forks source link

Multithreading is significantly broken #106

Open benoit74 opened 6 months ago

benoit74 commented 6 months ago

We are supposed to wait for all video + nodes processing, and return on first exception.

            futures = cf.wait(
                self.videos_futures + self.nodes_futures,
                return_when=cf.FIRST_EXCEPTION,
            )

Log of last khan_academy_fr as reported in #100 shows that multiple nodes had to fail before the scraper stopped.

I tried to debug / fix the issue locally but it is just pure non-sense for now, there is probably a bug somewhere but I do not achieve to understand where.

benoit74 commented 6 months ago

Block of code mentioned above in fact has two issues:

I tried to replace it with something else:

            cf.wait(
                self.nodes_futures,
                return_when=cf.FIRST_EXCEPTION,
            )
            futures = cf.wait(
                self.videos_futures + self.nodes_futures,
                return_when=cf.FIRST_EXCEPTION,
            )

It seems better, but I'm pretty sure we still have issues with the FIRST_EXCEPTION clause, especially since we probably loose all exceptions on video threads which occurs before the nodes futures have completed.

Multithreading is always a very complex thing to implement. I've isolated https://github.com/openzim/kolibri/issues/100 but I'm pretty sure we have other locks missing in our code, because even if a list/set/dictionary is thread-safe, it does not means that we do not have some portions of our code where we need locking, e.g. when checking that an item is not already in a list and then adding it to the list if it was missing.

It might be a bad idea, but at this point I suggest that we get rid of the multithreading in this (all?) scraper for now. We do not have the resources to set it up properly or maintain it in the long run. At least it is not ok now, and I always failed in such endeavor even if at first I always believed it was going to be easy. It makes logs very difficult to read because we never know which thread/process is "speaking". It does not help to have linear and predictable performances.

Zimit and iFixit do not have any multithreading. They are known to be quite "slow". But at least we know their behavior is more predictable, and code is easier to maintain.

WDYT?

rgaudin commented 6 months ago

Concurrency is complicated and makes maintenance and debugging difficult. That's a fact.

We deemed it necessary, knowingly, for performance reasons. We use some in almost all scrapers. Discarding it because it makes understanding/fixing an issue seems unreasonable.

This scraper (and others) were running for a long time. If I look at zimfarm runs, I see libretext kept working and khan-fr wasn't run since 11m ago, only to be run and failed from 3m ago (video-encoding-related). Between those dates, a lot of changes happened to the scraper: bootrap, scraperlib, video encoding, video concurrency…

If that helps, get rid of concurrency locally to ensure the rest works as expected then bring it back properly.

But to remove such a feature in a scraper (or all), we'd need a lot more arguments. And we all know what “for now” means 😅

benoit74 commented 6 months ago

This scraper (and others) were running for a long time. If I look at zimfarm runs, I see libretext kept working and khan-fr wasn't run since 11m ago, only to be run and failed from 3m ago (video-encoding-related). Between those dates, a lot of changes happened to the scraper: bootrap, scraperlib, video encoding, video concurrency…

Doesn't prove it was working properly. libretext has no video encoding, so sure it works properly. The log of khan-fr has already a multithreading issue, whole log is gone but what's left in scraper stdout on https://farm.openzim.org/pipeline/4b5053ff-0ed4-4433-8217-119a5d8ae7d7/debug shows that there have been multiple exceptions in video processing which did not stopped the scraper.

And we all know what “for now” means 😅

You made a point 🤣

If that helps, get rid of concurrency locally to ensure the rest works as expected then bring it back properly.

I don't feel like it is necessary, but thank you! It is just that we need to all be aware that I will spend time to fix some things now, and other bugs will probably arise in the future. But I'm committed to do by best to avoid/fix as many as possible! (PR for the bugs I'm aware of is almost ready indeed)

rgaudin commented 6 months ago

Understandable ; fortunately we only have a few kolibri recipes for now so it will be easy to spot regressions

benoit74 commented 6 months ago

I'm finally sorry to say that there is no PR ready yet, I do not achieve to make multiprocessing work as expected:

I've reproduced the problem in an isolated test case

import concurrent.futures as cf
from time import sleep
import functools

from datetime import datetime

# Without callbacks
#
# Expected behavior
# t0: all nodes added to the working queue, node 0 and node 1 starts
# t3: node 0 submits video 0
#     node 0 and node 1 complete
#     video 0 starts
#     node 2 and node 3 starts
# t6: node 2 submits video 2
#     node 2 and node 3 completes
#     node 4 and node 5 starts
# t6.5: video 0 fails
#       video 2 starts
# t7: checks detects that exception occured and cancels everything
# finished
#
# Real behavior with ThreadPoolExecutor (not perfect - ru
# nning tasks are continuing till
# they end, but more or less ok)
# t0: all nodes added to the working queue, node 0 and node 1 starts
# t3: node 0 submits video 0
#     node 0 and node 1 complete
#     video 0 starts
#     node 2 and node 3 starts
# t6: node 2 submits video 2
#     node 2 and node 3 completes
#     node 4 and node 5 starts
# t6.5: video 0 fails
#       video 2 starts
# t7: checks detects that exception occured and cancels everything
# t8: node 4 and node 5 completes <== UNEXPECTED
# t10: video 2 completes <== UNEXPECTED
# finished

# ---------------
# With callbacks
# ---------------
#
# With callbacks, the callback is executed at the end of the future, in "main" thread,
# so we assume it will delay the start of the next future
#
# Expected behavior with callbacks
# t0: all nodes added to the working queue
#     node 0 and node 1 starts
# t3: node 0 submits video 0
#     node 0 and node 1 complete
#     video 0 starts
#     callbacks of node 0 and node 1 starts
# t4: callbacks of node 0 and node 1 completes
#     node 2 and node 3 starts
# t6.5: video 0 fails
#       callback of video 0 starts
# t7: node 2 and node 3 completes
#     callbacks of node 2 and node 3 starts
#     checks detects that exception occured and cancels everything
# t7.5: callback of video 0 completes
# t8: callbacks of node 2 and node 3 completes
# finished
#
#
# Real behavior with ThreadPoolExecutor (a real mess, way too many things are executed)
# t0: all nodes added to the working queue
#     node 0 and node 1 starts
# t3: node 0 submits video 0
#     node 0 and node 1 complete
#     video 0 starts
#     callbacks of node 0 and node 1 starts
# t4: callbacks of node 0 and node 1 completes
#     node 2 and node 3 starts
# t6.5: video 0 fails
#       callback of video 0 starts
# t7: node 2 and node 3 completes
#     callbacks of node 2 and node 3 starts
#     checks detects that exception occured and cancels everything
#     callback of node 4 starts <= WHY?, future did not even started
# t7.5: callback of video 0 completes
#     video 2 starts <= WHY? we are supposed to cancel futures, not start new ones
# t8: callbacks of node 2 and node 3 completes
#     node 5 and node 6 starts <= WHY?
# t11: video 2 complete <= WHY?
#     callback of video 2 starts <= WHY?
#     node 5 and node 6 complete
#     callback of node 5 and node 6 starts <= WHY?
# t12: callback of video 2 complete
#     video 6 starts <= WHY?
#     callback of node 5 and node 6 completes
# t15: video 6 completes
#     callback of video 6 starts <= WHY?
# t16: call of video 6 complete
# finished

# Alter this to activate/deactivate callbacks
WITH_CALLBACKS = True

start = datetime.now()

def log(message: str):
    print(f"{round((datetime.now() - start).total_seconds(), 1)} - {message}")

def wait_for(seconds: float):
    mystart = datetime.now()
    while (datetime.now() - mystart).total_seconds() < seconds:
        pass

def node_callback(future: cf.Future, a: int):
    log(f"Node {a} callback started")
    wait_for(1)
    log(f"Node {a} callback finished")

def video_callback(future: cf.Future, a: int):
    log(f"Video {a} callback started")
    wait_for(1)
    log(f"Video {a} callback finished")

def process_video(a: int):
    log(f"Video {a} started")
    wait_for(3.5)
    if a < 1:
        log(f"Video {a} failed")
        raise Exception(f"Video problem with {a}")
    log(f"Video {a} completed")

def process_node(a: int):
    log(f"Node {a} started")
    wait_for(3)
    if a % 2 == 0:
        future = videos_executor.submit(process_video, a)
        videos_futures.add(future)
        if WITH_CALLBACKS:
            future.add_done_callback(functools.partial(video_callback, a=a))
    log(f"Node {a} completed")

# ProcessPoolExecutor has a very weird behavior, I struggled to make it work as expected
# ThreadPoolExecutor seems to be more stable, even if I don't really know why
nodes_executor = cf.ThreadPoolExecutor(max_workers=2)
videos_executor = cf.ThreadPoolExecutor(max_workers=1)

nodes_futures: set[cf.Future] = set()
videos_futures: set[cf.Future] = set()

for a in range(7):
    future = nodes_executor.submit(process_node, a)
    nodes_futures.add(future)
    if WITH_CALLBACKS:
        future.add_done_callback(functools.partial(node_callback, a=a))

while True:
    # we cannot use the cf.wait since videos_futures is not yet set (it needs nodes to
    # be started) and we would hence not capture exceptions occuring in nodes_futures
    # cf.wait(nodes_futures | videos_futures, return_when=cf.FIRST_EXCEPTION)
    wait_for(1)
    # log("Checking status")
    if (
        sum(1 if future._exception else 0 for future in nodes_futures | videos_futures)
        > 0
    ):  # we have to use ._exception, because .exception() if waiting for future to
        # complete
        log("Exception encountered")
        break

    if sum(0 if future.done() else 1 for future in nodes_futures | videos_futures) == 0:
        log("All tasks completed")
        break

log(
    f"{sum(1 if future.done() else 0 for future in nodes_futures | videos_futures)} tasks done"
)
log(
    f"{sum(0 if future.done() else 1 for future in nodes_futures | videos_futures)} tasks not done"
)

# this works more or else, because it cancels only futures which were not already
# started + it waits for only running futures to complete before returning, meaning that
# videos futures are cancelled only once the nodes futures have all completed
log("Shutting down nodes")
nodes_executor.shutdown(cancel_futures=True)
log("Shutting down videos")
videos_executor.shutdown(cancel_futures=True)
log("All exectutors shut down")

log(
    f"{sum(1 if future.done() else 0 for future in nodes_futures | videos_futures)} tasks done"
)
log(
    f"{sum(0 if future.done() else 1 for future in nodes_futures | videos_futures)} tasks not done"
)

For now I consider that concurrent.futures module is not behaving like we want it to. In addition to the requirements not been met, I had to poll continuously to find task failures, and use the private _exception property.

At this stage, I recommend to invest some development days to migrate to multiprocessing module. More code will be needed but at least we will be able to tune our stuff like we want. Or maybe there is another Python package doing this properly. If we do it on our own, we should definitely share this in python-scraperlib so that it is reused across Python scrapers.

I think that for the coming weeks / months we can live with this "bug", it seems to mainly mean that the scraper will not stop on first task failure ... but it probably has always been so.