CoffeaTeam / coffea-casa

Repository with configuration setup of a prototype of analysis facility - "coffea-casa"
BSD 3-Clause "New" or "Revised" License
17 stars 19 forks source link

[coffea-casa] Dependency management #112

Open oshadura opened 3 years ago

oshadura commented 3 years ago

From @andrzejnovak: I have a file foo.py that has a coffea processor object bar(), will dask work if in the casa notebook I run from foo import bar; out = run_uproot_job(..., dask_executor, bar(), ...)

mat-adamec commented 3 years ago

I have created a repository with my interpretation of this toy example: https://github.com/mat-adamec/testing_import-processor

The line client.upload_file('foo.py') appears to resolve the issue by sending foo.py to all of the workers. I have added this to the package installation documentation with https://github.com/CoffeaTeam/coffea-casa/pull/113.

andrzejnovak commented 3 years ago

@mat-adamec Thanks the example works, is there a way to extend it for such that it would work for files in subdirectories? I.e. from foo.bar import baz ? Currently, that doesn't seem to work, (I suppose upload_file doesn't recreate the dir structure on the worker node)

mat-adamec commented 3 years ago

@andrzejnovak Looks like this isn't directly possible, but may be soon... (see https://github.com/dask/distributed/issues/925 and https://github.com/dask/distributed/pull/4361)

The first issue linked includes work-arounds by turning your subdirectory into a zipfile. I'll test it out tomorrow. I also have a theory for a simpler work-around with client.register_worker_callbacks, though it'd still require a zip.

At the end of the day, the current package import system for coffea-casa is temporary until we figure out a better solution. I think this further exposes some important flaws.

andrzejnovak commented 3 years ago

Yeah, I guess it now crosses the activation energy to make a couple of loosely grouped scripts a package

mat-adamec commented 3 years ago

I think it's worth reopening this until we have a cleaner solution, and so that we know it's a problem that may need future reconsideration.

As it stands, sending files is easy. Sending whole subdirectories is possible but messy.

RonaldGalea commented 3 years ago

I think it's worth reopening this until we have a cleaner solution, and so that we know it's a problem that may need future reconsideration.

As it stands, sending files is easy. Sending whole subdirectories is possible but messy.

@mat-adamec Could you please provide more details about how you would send whole subdirectories? Would you need to create a tar/zip file containing your directory, upload that via upload_file, and then untar/unzip on the worker? How would the last step be done?

mat-adamec commented 3 years ago

I think it's worth reopening this until we have a cleaner solution, and so that we know it's a problem that may need future reconsideration. As it stands, sending files is easy. Sending whole subdirectories is possible but messy.

@mat-adamec Could you please provide more details about how you would send whole subdirectories? Would you need to create a tar/zip file containing your directory, upload that via upload_file, and then untar/unzip on the worker? How would the last step be done?

I had several theories for how to do this, but I'm not having any success implementing them. Even the work-around I found above (https://github.com/dask/distributed/issues/925) runs into trouble, because it seems to just dump the files into the worker without keeping the same directory structure, which makes importing a nightmare.

My current thought is to implement something of this sort:

import zipimport, zipfile, shutil, os
class SendDirectory:
    def __init__(self, dir):
        self.dir = dir

    def zipper(self):
        self.zip = shutil.make_archive(self.dir, 'zip', self.dir)

    def unzipper(self):
        # Need os.path.basename() because exact filepath is different on workers.
        zipfile.ZipFile(os.path.basename(self.zip)).extractall(self.dir)

    def send(self, client):
        self.zipper()
        client.upload_file(self.zip)
        client.register_worker_callbacks(self.unzipper)

Which should zip a subdirectory, upload it to the workers, then run a function on the workers which unzips the subdirectory (thus, keeping the same directory structure). Unfortunately, it doesn't seem to work, and I can't poke around too much at the moment because of unrelated worker issues. I'll continue tomorrow.

@oshadura how hard would it be to add https://github.com/dask/distributed/pull/4361#issue-539607520 as a patch to coffea-casa's distributed? I don't know the details of such a process, but it might serve as a better work-around for these issues than implementing the above if it is possible.

andrzejnovak commented 3 years ago

@nsmith- 's solution from https://github.com/CoffeaTeam/coffea/discussions/511#discussioncomment-680755 works like a charm

shutil.make_archive("myanalysis", "zip", base_dir="myanalysis")
dask_client.upload_file("myanalysis.zip")

@oshadura EDIT: There seems to be an issue with keeping this up on the workers as the run_uproot_job dies at random times with

Traceback (most recent call last):       ] | 1% Completed |  0.2s8s
  File "runner.py", line 330, in <module>
    output = processor.run_uproot_job(sample_dict,
  File "/opt/conda/lib/python3.8/site-packages/coffea/processor/executor.py", line 1235, in run_uproot_job
    wrapped_out = executor(chunks, closure, None, **exe_args)
  File "/opt/conda/lib/python3.8/site-packages/coffea/processor/executor.py", line 747, in dask_executor
    return accumulate([work.result() if clevel is None else _decompress(work.result())], accumulator)
  File "/opt/conda/lib/python3.8/site-packages/distributed/client.py", line 220, in result
    raise exc.with_traceback(tb)
  File "/opt/conda/lib/python3.8/site-packages/coffea/processor/executor.py", line 141, in __call__
    out = self.function(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/coffea/processor/executor.py", line 869, in _work_function
    processor_instance = cloudpickle.loads(lz4f.decompress(processor_instance))
AttributeError: Can't get attribute 'NanoProcessor' on <module 'workflows.ttbar_validation' from '/var/lib/condor/execute/dir_30254/dask-worker-space/worker-coa1e98u/workflows.zip/workflows/ttbar_validation.py'>
rkansal47 commented 3 years ago

Hi, related to this, I'm seeing the following error when using the client.upload_file() method: OSError: Timed out trying to connect to tls://red-c5627.unl.edu:49158 after 10 s.

Steps to reproduce are:

  1. clone https://github.com/rkansal47/HHbbVV
  2. run python runCoffeaCasa.py
mat-adamec commented 3 years ago

Hi, related to this, I'm seeing the following error when using the client.upload_file() method: OSError: Timed out trying to connect to tls://red-c5627.unl.edu:49158 after 10 s.

Steps to reproduce are:

  1. clone https://github.com/rkansal47/HHbbVV
  2. run python runCoffeaCasa.py

At what point do you see this error? Is it after the run finishes? I was able to do those two steps and get 99% through processing yesterday, though I had to end the run because it was dragging on forever (without errors) past that.

Also, have you confirmed that it's the client.upload_file() method that's breaking this and not something else? An easy check would be to make a file that only contains the upload_file() line (along with a processor) and try to run just that file. I would also try to make this a notebook instead of a script, just to make sure there aren't some weird issues popping up with running a script on coffea-casa (doubtful, but we've mostly tested notebooks so far).

rkansal47 commented 3 years ago

Hi, I see it at the beginning before processing. I tried again just now and received the same error, this is the full stacktrace, so I believe it is the client.upload_file("processors.zip") which is causing this:

Traceback (most recent call last):
  File "runCoffeaCasa.py", line 59, in <module>
    client.upload_file("processors.zip");
  File "/opt/conda/lib/python3.8/site-packages/distributed/client.py", line 3089, in upload_file
    return self.register_worker_plugin(
  File "/opt/conda/lib/python3.8/site-packages/distributed/client.py", line 4115, in register_worker_plugin
    return self.sync(self._register_worker_plugin, plugin=plugin, name=name)
  File "/opt/conda/lib/python3.8/site-packages/distributed/client.py", line 862, in sync
    return sync(
  File "/opt/conda/lib/python3.8/site-packages/distributed/utils.py", line 338, in sync
    raise exc.with_traceback(tb)
  File "/opt/conda/lib/python3.8/site-packages/distributed/utils.py", line 321, in f
    result[0] = yield future
  File "/opt/conda/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/opt/conda/lib/python3.8/site-packages/distributed/client.py", line 4031, in _register_worker_plugin
    responses = await self.scheduler.register_worker_plugin(
  File "/opt/conda/lib/python3.8/site-packages/distributed/core.py", line 864, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/distributed/core.py", line 663, in send_recv
    raise exc.with_traceback(tb)
  File "/opt/conda/lib/python3.8/site-packages/distributed/core.py", line 498, in handle_comm
    result = await result
  File "/opt/conda/lib/python3.8/site-packages/distributed/scheduler.py", line 6628, in register_worker_plugin
    responses = await self.broadcast(
  File "/opt/conda/lib/python3.8/site-packages/distributed/scheduler.py", line 5512, in broadcast
    results = await All(
  File "/opt/conda/lib/python3.8/site-packages/distributed/utils.py", line 222, in All
    result = await tasks.next()
  File "/opt/conda/lib/python3.8/site-packages/distributed/scheduler.py", line 5504, in send_message
    comm = await self.rpc.connect(addr)
  File "/opt/conda/lib/python3.8/site-packages/distributed/core.py", line 1012, in connect
    comm = await connect(
  File "/opt/conda/lib/python3.8/site-packages/distributed/comm/core.py", line 309, in connect
    raise IOError(
OSError: Timed out trying to connect to tls://red-c5627.unl.edu:49158 after 10 s

Could it be that the issue is with a particular worker node which only I'm assigned to?

I also tried it in a notebook and received the same error (cell 2) https://github.com/rkansal47/HHbbVV/blob/main/runCoffeaCasa.ipynb

mat-adamec commented 3 years ago

Could you try running one of the benchmark examples (for example, here)? If it's a problem with the worker, then it should probably error out there too. The reason that upload_file appears to be triggering the issue could just be because it's the first time you ask your worker to do anything, and it's just broken.

If that works, could you try uploading a single file instead of a zip? Looking at the folder you're zipping, I see there's a pkl file that I can't open because it's not UTF-8 encoded... maybe the workers don't like that, so if uploading a single file works, you could try zipping everything except the pkl file and seeing if you can upload that.

I'm just throwing a few darts at the wall here. If none of this helps, we at least exhaust some of the alternate possibilities and come back to the initial problem... which is that we still need a better dependency management system! Users definitely shouldn't have to be wrestling with a work-around this much.

rkansal47 commented 3 years ago

Hi, thanks for the suggestions (let me know if I missed something).

  1. Uploading without the .pkl file in the zip - same error
  2. Uploading just a single file (tried both a python .py file and a plain .txt. file) - same error
  3. The example1 notebook you linked worked fine

I think I can get around the issue for now by defining the processor in the notebook or python file itself (although I also saw a similar issue of being stuck at 99% or earlier which maybe I can ask about once I investigate further).

mat-adamec commented 3 years ago

Thank you for doing these tests! I will try to get others to see if they can replicate your error, and, if not, we can probably chalk it up to something broken in your instance and try to get that fixed.

Regarding the last point, I believe the 99% issue is a known occurrence with Dask. I don't know if coffea-casa has an issue listed about it yet, but I know that it's been discussed around the coffea team before. If you do happen to investigate and find any info about what is happening, then please submit an issue so we can gather more data!