Closed willirath closed 7 years ago
I've started by refactoring the loop into a map. This should not do anything except for slightly altering the log message. (The "# of #" info is gone.)
Merging #45 into master will increase coverage by
0.47%
. The diff coverage is100%
.
@@ Coverage Diff @@
## master #45 +/- ##
==========================================
+ Coverage 94.06% 94.53% +0.47%
==========================================
Files 2 2
Lines 219 238 +19
==========================================
+ Hits 206 225 +19
Misses 13 13
Impacted Files | Coverage Δ | |
---|---|---|
conda_mirror/conda_mirror.py | 94.46% <100%> (+0.48%) |
:arrow_up: |
Continue to review full report at Codecov.
Legend - Click here to learn more
Δ = absolute <relative> (impact)
,ø = not affected
,? = missing data
Powered by Codecov. Last update f352883...5d47dc1. Read the comment docs.
6e3ba41 should do what I outlined in #45.
Looks like multiprocessing.Pool.map()
does not see the same outer scope as map()
.
So pickling the actual validation function also fails. We can still explicitly pass the necessary info.
(Sorry for testing all this using the CI. I'm not in a proper dev environment at the moment.)
Test are passing. @ericdill, can you have a try with this?
To use concurrent mode, set the number of threads in an environment var. The following should validate a conda-forge mirror using 8 threads:
export CONDA_MIRROR_NUM_THREADS=8
conda-mirror --upstream-channel conda-forge --target-directory local_mirror --platform linux-64
A few thoughts on what's left to do:
package_repodata
to all threads creating too much overhead?@willirath I'll give this a close look tomorrow. I've got a planning meeting for the rest of the day (I'd much rather be working on this PR 😀 with you)
Regarding your todo list
Is this working as expected in a real-world example?
I'll set it up to run as a nightly job tomorrow and monitor its status
Add command line arg to set number of threads?
I generally prefer cli args to env vars and argparse is already configured, so shouldn't be too hard to add another "num_workers" argument or some such.
Coverage decereased quite a bit.
Now that you bring this up, that's actually one of the main reasons I abandoned the multiprocessing validation. I'm a sucker for high coverage numbers.
Is explicitly passing package_repodata to all threads creating too much overhead?
Yeah that's probably a bit unnecessary.
I had toyed around with a separate script that would use conda_mirror as a library, but many of the same things apply here too. Basically I think you want to zip up the path to each package and the associated metadata and then pass that to your validation function. That way you're not needing to copy that repodata dict N times. I think there's plenty of room to solve this problem in many different ways and am interested to see where this one ends. Here's how I did it as a separate script:
from conda_mirror import conda_mirror as cm
from pprint import pprint
from argparse import ArgumentParser
from os.path import join
import json
import os
from multiprocessing import Pool
import sys
import time
cm._init_logger(3)
METADATA="No metadata found"
PACKAGE_VALIDATION="Validation failed"
def validate(pkg_tuple):
package_path, package_metadata = pkg_tuple
if package_metadata is None:
return package_path, METADATA, ""
ret = cm._validate(package_path,
md5=package_metadata.get('md5'),
sha256=package_metadata.get('sha256'),
size=package_metadata.get('size'))
if ret is not None:
return package_path, PACKAGE_VALIDATION, ret
def cli():
ap = ArgumentParser()
ap.add_argument(
'pkgs_dir',
action='store',
help="The path to the directory that you are interested in"
)
ap.add_argument(
'num_workers',
action='store',
help="The number of parallel processes to spin up"
)
ap.add_argument(
'--cron',
action="store_true",
help="Disable print calls that don't do so well in logs",
default=False
)
args = ap.parse_args()
with open(join(args.pkgs_dir, 'repodata.json'), 'r') as f:
repodata = json.load(f)
conda_packages = cm._list_conda_packages(args.pkgs_dir)
pkg_iter = ((join(args.pkgs_dir, pkg), repodata['packages'].get(pkg))
for idx, pkg in enumerate(conda_packages))
start = time.time()
need_attention = []
with Pool(int(args.num_workers)) as p:
for i, ret in enumerate(p.imap_unordered(validate, pkg_iter)):
elapsed = int(time.time()) - int(start) or 1
pkgs_per_sec = int(i / elapsed) or 1
eta = int((len(conda_packages) - i) / pkgs_per_sec)
msg = ('{0}/{1} {2}s elapsed {3} processed/sec {4}s remaining'.format(
i, len(conda_packages), elapsed, pkgs_per_sec, eta))
if not args.cron:
sys.stderr.write('\r%s' % msg)
else:
if i % 100 == 0:
print('%s'% msg)
if ret is not None:
need_attention.append(ret)
print('\n%s packages need attention' % len(need_attention))
for package_path, problem_type, info in need_attention:
if problem_type == METADATA:
print("Removing %s because it is not in the local "
"repodata.json" % (package_path), file=sys.stderr)
os.remove(package_path)
elif problem_type == PACKAGE_VALIDATION:
print("Removing %s because it failed package validation with this "
"reason: %s" % (package_path, info), file=sys.stderr)
continue
#os.remove(package_path)
del repodata['packages'][os.path.basename(package_path)]
# Update the repodata immediately
cm._write_repodata(os.path.dirname(package_path), repodata)
print("All packages that require attention")
pprint(need_attention)
if __name__ == "__main__":
cli()
So I'm back.
Running
export CONDA_MIRROR_NUM_THREADS=8
conda-mirror --upstream-channel conda-forge --target-directory local_mirror --platform linux-64
at 208d203e8d5e61d80ab950c34b90770e738ba021 seems to work as intended.
I can't say much about performance gains as I was only testing this on my workstation which was relatively busy otherwise. @ericdill: Did you have a chance to try this on your jobs?
Coverage decereased quite a bit.
Now that you bring this up, that's actually one of the main reasons I abandoned the multiprocessing validation. I'm a sucker for high coverage numbers.
https://codecov.io/gh/maxpoint/conda-mirror/compare/8159290dca5ccba2fe9e5b1f6b1053241712fbca...208d203e8d5e61d80ab950c34b90770e738ba021/diff#D1-382 and below looks like _validate_packages_loop
is never called in the test used to assess coverage. So let's add a test for _validate_packages_loop
.
@willirath sorry for letting this sit for 10 days. It fell off my mental stack and I forgot about it. I'll be able to think on this again next week and will reply with constructive comments then. Thanks for working on this PR!
I also was quite busy with other things. But I'll hopefully be able to get this done during the next days.
A first test shows that in fact validation runs concurrently using the given number of processes. I'll continue cleaning up the args passed to Pool.map
and adding a cli argument --num-procs-val
instead of the environment variable that is used now.
You are probably most familiar with your existing tests. Can you then add the tests using concurrent validation?
OK ...
The problems with coverage turned out to be due to the way coverage
handles multiprocessing.
The essential bits were avoiding SIGTERM
in subprocesses (See this comment.) and then explicitly running coverage
in concurrent mode.
In the end, coverage was increased by 4.5‰ :)
@ericdill: Can you have a look? I see no further issues.
@willirath Excellent job tracking down that coverage multiprocessing thing! That's been plaguing me for a while now. Looking over the PR right now
That's great!
I'll have a look tomorrow as well. Given that it's quite a messy series of commits now, I wonder if I should clean this up in a different brach w only a single commit eventually.
I wonder if I should clean this up in a different brach w only a single commit eventually.
In general, I am not particularly concerned about feature branches with a large number of commits. However, there is one commit that I'd like you to get rid of in this feature branch since it's a merge of master into the feature branch.
If you'd like to just squash all of this into one commit that's fine with me.
Tried this locally and everything worked just fine. validation is so much faster too. Thanks for your efforts on this @willirath . If you could just rebase this whole thing to get rid of that merge from master into this branch, then I think this LGTM!
If you'd like to just squash all of this into one commit that's fine with me.
Let me know when all this is done. I'll squash it then.
I've added a clean PR #48 which squashes all of the above and also has a proper commit message.
Closing in favor of #48
Proof-of concept for concurrent package validation
Implements solution to #43.