Closed mrichar1 closed 11 months ago
Hey @mrichar1. Thanks for the PR.
I'm seeing a failing test with test_process_defined_batch
in test_multiprocessor.py
. There are 5 audio files in the test directory, but the new code returns 17 MultiProcessRecording objects. This is the test that limits the number of processes that can run.
pytest tests/test_multiprocessor.py::test_process_defined_batch --capture=no
def test_process_defined_batch():
analyzer = LiteAnalyzer()
test_files = "tests/test_files"
processes = 1
start = time.time()
with tempfile.TemporaryDirectory() as input_dir:
# Copy test files to temp directory.
copytree(test_files, input_dir)
assert len(os.listdir(input_dir)) == 7
batch = DirectoryMultiProcessingAnalyzer(
input_dir, analyzers=[analyzer], processes=processes
)
batch.process()
> assert len(batch.directory_recordings) == 5
E assert 17 == 5
E + where 17 = len([<birdnetlib.main.MultiProcessRecording object at 0xffff50026f70>, <birdnetlib.main.MultiProcessRecording object at 0x...MultiProcessRecording object at 0xffff406dafd0>, <birdnetlib.main.MultiProcessRecording object at 0xffff406dd7f0>, ...])
E + where [<birdnetlib.main.MultiProcessRecording object at 0xffff50026f70>, <birdnetlib.main.MultiProcessRecording object at 0x...MultiProcessRecording object at 0xffff406dafd0>, <birdnetlib.main.MultiProcessRecording object at 0xffff406dd7f0>, ...] = <birdnetlib.batch.DirectoryMultiProcessingAnalyzer object at 0xffff5371ad60>.directory_recordings
tests/test_multiprocessor.py:209: AssertionError
================================================================================== short test summary info ===================================================================================
FAILED tests/test_multiprocessor.py::test_process_defined_batch - assert 17 == 5
There's also a new unhandled segfault being thrown by tensorflow, but only if you run the entire test suite. I'll try to track that down once the individual tests are passing.
ANALYZING AUDIO... Fatal Python error: Fatal Python error: Segmentation faultSegmentation fault
Current thread 0xCurrent thread 0x0000ffff890330200000ffff89033020 (most recent call first):
(most recent call first):
File File ""/usr/local/lib/python3.9/site-packages/tensorflow/lite/python/interpreter.py/usr/local/lib/python3.9/site-packages/tensorflow/lite/python/interpreter.py"", line , line 509509 in in __del____del__
Once the tests pass, my first question would be if the Analyzer models are being initialized with every queued object.
If I recall correctly, the most expensive memory/time issue was the startup of the Analyzers themselves, and that passing the initialized analyzer along was necessary to ensure only one Analyzer per processing thread. The initialized Analyzers can be passed within a thread, but not from the queue or from p.map.
So instead of passing N queued objects to the p.map (which would each initialize a list of Analyzers), the current code passes function kwargs and then those functions run in a loop until the queue was empty.
Just as a point of comparison, how many files do I need to include in a directory before I start to see this issue? I've got a few low RAM machines here I'd like to test with. Also, what is your typical length of audio file in your use case?
Thanks!
Hi,
Apologies for not coming back to this sooner. I've just gone back to have a look (and actually run the test suite!) and it appears I didn't remove all of the 'recursive' code, which meant it was processing some recordings more than once. The latest push now seems to be passing tests.
Thanks for the info on the cost of setting up Analyzers - I'm only using the one in my project and not seeing any real speed difference between my code and the original, only the decrease in memory use, but this is very likely to be sample-specific. At present it is setting up a new Analyzer for each sample, but that would be easy to change (process_from_queue already accepts an analyzers
argument from the looping approach). I'm happy to do this as well if the overall PR looks useful.
For reference I was processing 8GB of ~20MB audio files (approx 5 mins at 48k) - but after running for a minute or 2 the worker memory use can be seen creeping up in top
, then it's just a case of waiting for the system to run out of memory.
Hi, it looks like this PR has passed all existing tests. Were you able to confirm that the worker memory use remained stable when executing your use case?
I haven't been able to spin up a test run on my own machines, but I'll try to do that in the next week or so. I haven't forgot about this, just don't have enough bandwidth at the moment to try this out.
That's great - yes when processing 8GB of 5min samples, each worker remains consistently using around 500MB of RAM over the full run. With the original code, this increases by around 100MB/worker/min (until RAM is exhausted).
This is a fairly small tweak to reduce the memory usage when using
DirectoryMultiProcessingAnalyzer
.Currently
process_from_queue
is called recursively by each worker until theshared_queue
is exhausted. This means that all 'instances' of this function are kept running until the queue is empty. This causes a gradual increase in memory use over time, which can lead to a 'stack overflow' when processing lots of files, or on systems with low RAM.This PR alters the behaviour to instead call
process_from_queue
'fresh' each time, preventing this situation from occurring.(As an aside, these changes would also allow
p.map
to be swapped out forp.imap
and haveprocess
yield results as they are generated, rather than building and returning a large results list at the end. This might be useful for checkpointing if the program needs to be restarted, or for passing on results 'as they happen' to other pipelines. But I'll leave that for another PR, if this one is accepted!)