materialsproject / jobflow

jobflow is a library for writing computational workflows.
https://materialsproject.github.io/jobflow
Other
90 stars 25 forks source link

Store files as output #585

Open gpetretto opened 3 months ago

gpetretto commented 3 months ago

The proposal of allowing to store files directly as output of jobflow has been raised several times. For example in this discussion: https://github.com/materialsproject/atomate2/issues/515. The idea is that, while it would be in principle possible to store a file using a maggma Store (e.g. a GridFSStore or an S3Store), this requires respecting the Store API. This would definitely have limitations, for example the need to load in memory the whole file.

This is an initial draft where jobs are allowed to store files directly in a new kind of store. In any case there are definitely several points that should be considered carefully and room for improvement. I consider this the basis to start a discussion.

Based on previous interactions and discussions on the topic I suppose that @utf, @rkingsbury, @ml-evs, @computron may be interested.

Changes

The current implementation introduces the following changes

All these changes should be backward-compatible with existing jobflow workflows.

Example

Here is a simple example to demonstrate the concept. The configuration, including the new JobStore:

JOB_STORE: 
  docs_store:
    type: MongoStore
    collection_name: jobflow 
    database: jobflow_test
    host: localhost
    password: ''
    port: 27017
  additional_stores:
    data:
      type: GridFSStore
      database: jobflow_test
      host: localhost
      port: 27017
      password: ""
      collection_name: outputs_blobs
  files_stores:
    data_files:
      type: FileSystemFileStore
      path: /some/path/fs_file_store
    data_files_gfs:
      type: GridFSFileStore
      database: jobflow_test
      host: localhost
      port: 27017
      password: ""
      collection_name: outputs_files
  load: false
  save: {}

A Job that creates some files and stores them in the output:

from jobflow import job, Response, Flow, run_locally

@job
def add(a, b):
    with open("output_file", "wt") as f:
        f.write(f"The result is {a + b}")
    return Response(a + b, output_files={"data_files_gfs": "output_file"})

add_first = add(1, 5)
add_second = add(add_first.output, 3)

flow = Flow([add_first, add_second])

run_locally(flow, create_folders=True)

A script to retrieve the file after completion:

from jobflow import SETTINGS
from jobflow.core.schemas import JobStoreDocument

js = SETTINGS.JOB_STORE

js.connect()
js_doc = JobStoreDocument.model_validate(js.query_one())

print(js_doc.files[0])
js.get_file(dest="downloaded_file", reference="output_file", job_doc=js_doc)

Discussion

Considering this as a good starting point, I think there are several points that may be worth discussing. I will list some here.

ml-evs commented 3 months ago

This looks great so far @gpetretto, would be happy to help test/implement this. The only missing piece that I can see (and why many people want to be able to define files as workflow outputs) would also be the ability to use them as workflow inputs in some standardized/transparent way (i.e., calling the script you provide for retrieval during the job set up step). Do you think that could be included here (or in a future PR)?

gpetretto commented 3 months ago

If the option to use the files as input for the workflows is needed, I believe it would be better to add it directly here. Otherwise the risk is that it will not be compatible with the choices made. Do you have some ideas of how you expect this to work?

As hinted in the initial message, one can imagine defining an OutputFileReference object that could be accessed as an output_files attribute of Job. Let's suppose revising the example in this way:

@job
def add(a, b):
    with open("output_file", "wt") as f:
        f.write(f"The result is {a + b}")
    return Response(a + b, output_files={"data_files_gfs": "output_file"})

@job
def read_file():
    with open("output_file", "rt") as f:
        content = f.read()
    return content

add_job = add(1, 5)
read_job = read_file(add_job.output_files["name"]["output_file"])

In this case read_file does not accept any argument, but the Job could be able to distinguish between an OutputReference and an OutputFileReference and decide to fetch the file before running the Job, without actually passing the argument to the decorated function. Alternatively read_file could expect an argument and if it is an OutputFileReference the file name (or file_path, or the FileData) will be passed. Notice that a similar mechanism can be exploited to pass the reference as an input for a new Flow, similarly to what is done in the multi-md flow.

I already see some potential shortcomings with this design. For example you will not be able to decide precisely where to copy the file (name, path). Also, how do you refer the file? Here I tried to use a dictionary-like access where I specified that the file should be selected by "name".It could be also possible to selecte by "path" or "reference". But is this reliable enough?

The reason why I am sticking to a reference-like access is that it would be needed to properly support the feature in jobflow-remote. There we cannot rely on the FileStore at runtime, so there should be a way to fetch the file before the job is started. Any better idea?

utf commented 3 months ago

Agreed with @ml-evs, this looks fantastic and a very nice implementation. I was also wondering about re-using files. Other workflow managers such as covalent have strategies to do this E.g., see here. Although not sure if any of this is relevant here since it looks like you need to know the file paths in advance.

I have a few ideas of how this could be implemented:

  1. Define file keywords and destination in @job decorator. E.g.
    
    @job(wavecar_file=FileDestination("WAVECAR.previous"))
    def my_func(arg1, wavecar_file):
    pass

my_func("abc", wavecar_file=my_func.output_files.wavecar)

One limitation is that this wouldn't support dynamic numbers of files.

2. This could be augmented with a special `files` keyword argument that gets inserted by the `@job` decorator. Any file references that get passed to this would automatically get written in the current directory. E.g.

```python
@job
def my_func(arg1):
    pass

my_func(arg1, files=[my_func.output_files.wavecar])

The files argument could potentially also accept a dictionary/some specialised object that can specify the output destination.

my_func(arg1, files=[{"src": my_func.output_files.wavecar, "dst": "WAVECAR_previous"}])

The downside to this approach is that now the caller has specify the expected output destination, rather than this being set by the function itself.

davidwaroquiers commented 3 months ago

Thanks @gpetretto for this (well we already discussed and I already thanked you but not officially ;) )

Regarding re-using files, indeed if there is a need for it, it should be added. Now, to be honest, I am not completely sure this is the most important thing.

Consider two use cases: 1/ Using CHGCAR (or WAVECAR) as a starting point for the next calculation. You could potentially store the file somewhere (e.g. S3, Azure Blob or else) and then the next calculation could restart from it. Ok this means now you have to copy this file through the network to this external storage at the end of the first job and then copy it back at the beginning of the next job. Of course currently this is done using prev_dir and the (only ?) limitation is that it has to be on the same cluster (or maybe even not with the FileClient ? At least, there might be issues if e.g. it's not the same vasp version and the format of the files is not compatible or something ...).

2/ Let's suppose you make a MLFF force field with Vasp and then you want to use it for 10000 other calculations. Then this file will need to be copied 10000 times to the cluster (so 10000 transfers and 10000 copies on the cluster).

So I understand the idea that it is nice, but I just want to make sure there is a real use case where we will actually not be constrained in the future by network transfers or storage. Imagine you have an MD trajectory of 20 Gb. And you want to reuse that trajectory for 10 new calculations (for some reason, ... no idea why someone would do that but anyway), then it will be copied once to the e.g. Azure FileStore and then copied back 10 times (200 Gb transfer, 200Gb storage on cluster) for these 10 calculations. (also, consider when using jobflow-remote in the split mode, the files are then transfered 2 times each time: from cluster to runner, from runner to azure cloud when storing, and from azure cloud to runner then runner to cluster when reusing the file)

So maybe the question is: What do we want to tackle here exactly with this possibility to transfer files ? And I think we should first answer this question.

ml-evs commented 3 months ago

So I understand the idea that it is nice, but I just want to make sure there is a real use case where we will actually not be constrained in the future by network transfers or storage. Imagine you have an MD trajectory of 20 Gb. And you want to reuse that trajectory for 10 new calculations (for some reason, ... no idea why someone would do that but anyway), then it will be copied once to the e.g. Azure FileStore and then copied back 10 times (200 Gb transfer, 200Gb storage on cluster) for these 10 calculations. (also, consider when using jobflow-remote in the split mode, the files are then transfered 2 times each time: from cluster to runner, from runner to azure cloud when storing, and from azure cloud to runner then runner to cluster when reusing the file)

So maybe the question is: What do we want to tackle here exactly with this possibility to transfer files ? And I think we should first answer this question.

I think this emphasizes exactly why I would want this to be handled at the jobflow level, so that jobflow-remote and other managers have a substrate on which they can something more sensible (e.g., copy the file once to each cluster that needs it and make sure hashes match on future runs), otherwise users and wf developers will invent many other ways of using them.

davidwaroquiers commented 2 months ago

I fully agree this must be at the jobflow level indeed, but even though, the same applies. Consider a run_locally where you have defined the store (with a docs store, additional stores AND file stores), the exact same applies as above. The first job will copy files to a remote place, and then the second job will copy it back locally. Currently, this is done by just passing the previous directory (with, as mentioned above, the constraint that it may not run on a different worker unless the FileClient is properly set up/working ?). This possibility also opens up the question for the developer as to what he should use. I agree it is probably more convenient to have this file transfer somehow directly integrated (as in some other wf managers such as covalent), but is it the most efficient in terms of network. For small files, it probably does not make a huge problem (unless there are many of them, or many jobs copying the same file), but for larger files, this may slow down things quite a bit (and even pose problems, e.g. with a "simple" run_locally if the remote e.g. azure file store is not accessible, what happens ? probably the only thing it can do is just fail with an error).

Now understand me well, I'm not at all against the idea for the possibility to reuse files and it can (and probably should) be implemented in addition to the current implementation (whose aim is "just" to store/archive files somehow)! I'm just raising a point as to when this should be used for reusing files. And I have not yet found a use case in which I'm convinced it would be better than the current approach (with a previous directory).

davidwaroquiers commented 2 months ago

So I understand the idea that it is nice, but I just want to make sure there is a real use case where we will actually not be constrained in the future by network transfers or storage. Imagine you have an MD trajectory of 20 Gb. And you want to reuse that trajectory for 10 new calculations (for some reason, ... no idea why someone would do that but anyway), then it will be copied once to the e.g. Azure FileStore and then copied back 10 times (200 Gb transfer, 200Gb storage on cluster) for these 10 calculations. (also, consider when using jobflow-remote in the split mode, the files are then transfered 2 times each time: from cluster to runner, from runner to azure cloud when storing, and from azure cloud to runner then runner to cluster when reusing the file) So maybe the question is: What do we want to tackle here exactly with this possibility to transfer files ? And I think we should first answer this question.

I think this emphasizes exactly why I would want this to be handled at the jobflow level, so that jobflow-remote and other managers have a substrate on which they can something more sensible (e.g., copy the file once to each cluster that needs it and make sure hashes match on future runs), otherwise users and wf developers will invent many other ways of using them.

About having the copy once, how would you see that ? Is that the idea of the "context" you were at some point mentioning ?

JaGeo commented 2 months ago

Yeah, I agree with the file transfer problem. As someone who copies WAVECARs in workflows, I would like to ask for an option to just pass a directory path. 😅

utf commented 2 months ago

@JaGeo @davidwaroquiers I agree that this shouldn't replace the prev_dir option in atomate2. Passing a folder will be much more efficient for most purposes.

However, I think it is nice to have an additional option in jobflow in the case that this isn't possible. One immediate application is when jobs will be run on different compute resources and so copying from a folder won't work. E.g., let's say you have access to a GPU cluster for training ML models, and a different CPU cluster for generating training data. It seems like this approach would facilitate file transfers between the two.

JaGeo commented 2 months ago

@JaGeo @davidwaroquiers I agree that this shouldn't replace the prev_dir option in atomate2. Passing a folder will be much more efficient for most purposes.

However, I think it is nice to have an additional option in jobflow in the case that this isn't possible. One immediate application is when jobs will be run on different compute resources and so copying from a folder won't work. E.g., let's say you have access to a GPU cluster for training ML models, and a different CPU cluster for generating training data. It seems like this approach would facilitate file transfers between the two.

Definitely! But should also be easy to configure and change for the user as it will really depend on the cluster setup.

gpetretto commented 2 months ago

I like @utf's proposals. An additional downside of the second one is that file would not be a good argument for a Job function anymore. Overall I would say the first one is more appealing. Maybe we can find a way to extend it to cover more cases. I think that, as long as the specifications are clear, we could also have something like

@job(multiple_files=FolderDestination("."))
def my_func(arg1, multiple_files):
    pass

my_func("abc", multiple_files=[my_func.output_files.wavecar, my_func.output_files.chgcar])

This does not give full freedom on naming the files, but there should be a limit at some point to what can be done with a simple decorator.

Note that we will probably need to resort to a notation like the one I uses in the initial example add_job.output_files["name"]["output_file"], because file name often contain ., so using the dot notation might not be always an option. I suppose both could be supported.

I think this emphasizes exactly why I would want this to be handled at the jobflow level, so that jobflow-remote and other managers have a substrate on which they can something more sensible (e.g., copy the file once to each cluster that needs it and make sure hashes match on future runs), otherwise users and wf developers will invent many other ways of using them.

I see the point. Indeed this should be something that could be handled by jobflow-remote. However, I have two major concerns:

Yeah, I agree with the file transfer problem. As someone who copies WAVECARs in workflows, I would like to ask for an option to just pass a directory path. 😅

I suppose you are not aiming at storing the WAVECAR in an output FileStore, so can you clarify a bit more your suggestion? Are you proposing to also have something like covalent, that would allow to to specify which files to copy in the decorator, rather than relying on prev_dir? Or just saying that this should not prevent using the prev_dir? Or did I miss it completely?

In all of these cases it is important to note that there is the constraint that the **kwargs of the Job (and thus the @job decorator) are already assigned to the additional_stores, so there is probably a limit to what we can add with arbitrary arguments.

rkingsbury commented 2 months ago

Very nice contribution @gpetretto ! The only point I would like to better understand is how the FileStore and FileSystemFileStore implemented here differ from the FileStore in maggma. Specifically:

I was the original author of the existing FileStore, and at the time my main objectives were to facilitate querying file attributes and reading data from disk into a Store, thus facilitating the beginning of a maggma based processing pipeline. It does have the ability to delete files, and I'm not opposed in principle to adding more robust capabilities around creating files either. Very open to discussion about what would be most useful.

gpetretto commented 2 months ago

Hi @rkingsbury, thanks for your comments and mentioning the maggma FileStore. To be honest my idea is that the FileStore defined here should be serving a quite different purpose compared to the Store defined in maggma. The main point is that the Store is centered around dictionaries, since all the API is based on the update and query methods. Due to this a Store is bad at handling files. All the stores that work with file-like objects (GridFSStore, S3Store, AzureBlobStore) cannot deal with files any better than all the other stores: they will need to read files, store all in memory as bytes, put them in a dictionary and store the bytes of the dictionary in a blob. When retrieving data one would have to query documents, again read all the bytes into memory and then the client of the Store would need to decide how to deal with the data. This basically loses all the advantages of working directly with files, especially when they are big, which is the main point of this PR. Otherwise it should be possible to put the bytes of a file in the output of the Job and use the additional_stores to save those data in another Store. Which we think is not the right way of dealing with big files. So the idea is that this FileStore should be really a way to work with files directly. It would be an interface for GridFS put/get methods and S3 boto3 upload_file/download_filedirectly. Similarly for other file hosting services. The main point is that, since the purpose is clearly different I don't think that theput/getmethods should follow theupdate/query` API.

An open point is related to the fact that right now the reference to retrieve the files stored is only available in the main collection of the JobStore. One could add an index standard Store to the FileStore to keep track of the files. After some pondering we thought that this could be an overcomplication for the FileStore, but adding it is an option. For example a new FileStore base class could roughly look like this:

Python code ```python class FileStore(Store, metaclass=ABCMeta): """Abstract class for a file store.""" def __init__(self, index: Store): self.index = index def query( self, criteria: Optional[Dict] = None, properties: Union[Dict, List, None] = None, sort: Optional[Dict[str, Union[Sort, int]]] = None, skip: int = 0, limit: int = 0, ) -> Iterator[Dict]: return self.index.query( criteria=criteria, properties=properties, sort=sort, skip=skip, limit=limit ) def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None): return self.index(docs=docs, key=key) @abstractmethod def _put(self, src: str | io.IOBase, dest: str) -> str: pass def put(self, src: str | io.IOBase, dest: str, metadata: dict) -> str: reference = self._put(src, dest) metadata = dict(metadata) metadata["reference"] = reference self.update(metadata, key=reference) return reference @abstractmethod def _get(self, reference: str, dest: str | io.IOBase): pass def get(self, criteria: str, dest: str | io.IOBase): doc = self.query_one(criteria) if not doc: raise ValueError return self._get(reference=doc["reference"], dest=dest) ```

Even though I am not sure if this is really advantagous.

davidwaroquiers commented 2 months ago

Thanks @rkingsbury and @gpetretto for the discussion on Maggma stores. Maybe the naming could be different to make it clear. Somehow the file store here is indeed not a Maggma Store. We could call these FileStorage instead or FileBucket or FileArchive or something like that. Would that clarify things better ?

gpetretto commented 2 months ago

To keep things going I have tried to add an implementation of the output_files reference. It works a bit differently compared to the standard OutputReference because its format should be fixed. I imagine there would be improvements to be made. At the moment this will allow to run something like this:

@job
def add(a, b):
    with open("output_file", "wt") as f:
        f.write(f"The result is {a + b}")
    # return Response(a + b, output_files={"data_files_gfs": "output_file"})
    return Response(a + b, output_files={"data_files": "output_file"})

@job
def read_file(file_path):
    with open(file_path, "rt") as f:
        content = f.read()
    return content

add_job = add(1, 5)
read_job = read_file(add_job.output_files.name["output_file"])
flow = Flow([add_job, read_job])

print(run_locally(flow, create_folders=True))

Or modifying the target path with

@job(file_path=FileDestination("test_dir/test_out_file"))
def read_file(file_path):
    with open(file_path, "rt") as f:
        content = f.read()
    return content

@ml-evs, would this fit your use case?

At some point I thought of taking advantage of the FileDestination object to also allow the copy of generic files:

@job(file_path=FileDestination("test_dir/test_out_file"))
def read_file(file_path="/path/to/some/file):

or maybe even from different sources

@job(file_path=FileDestination("test_dir/test_out_file"))
def read_file(file_path=FileSource(path="/path/to/some/file, host="some.host"):

but I was afraid that things would get out of hand with too many options to deal with (multiple files, link vs copy, ...). So maybe this could be added at a later time if it seems interesting, as it is not strictly related to the addition of output files.

rkingsbury commented 2 months ago

Thanks @rkingsbury and @gpetretto for the discussion on Maggma stores. Maybe the naming could be different to make it clear. Somehow the file store here is indeed not a Maggma Store. We could call these FileStorage instead or FileBucket or FileArchive or something like that. Would that clarify things better ?

Hi David, sorry for the delay (very busy couple of weeks here at the end of the semester!) but thank you for your thoughtful and thorough reply. I still want to think more and give a longer reply later, but for quick feedback - I really like your suggestion here. If the FileStore in this PR is not intended to follow the maggma Store API or to behave like one, then the class name should preferably not end in Store.

rkingsbury commented 2 months ago

Hi @rkingsbury, thanks for your comments and mentioning the maggma FileStore. To be honest my idea is that the FileStore defined here should be serving a quite different purpose compared to the Store defined in maggma. The main point is that the Store is centered around dictionaries, since all the API is based on the update and query methods. Due to this a Store is bad at handling files. All the stores that work with file-like objects (GridFSStore, S3Store, AzureBlobStore) cannot deal with files any better than all the other stores: they will need to read files, store all in memory as bytes, put them in a dictionary and store the bytes of the dictionary in a blob. When retrieving data one would have to query documents, again read all the bytes into memory and then the client of the Store would need to decide how to deal with the data. This basically loses all the advantages of working directly with files, especially when they are big, which is the main point of this PR. Otherwise it should be possible to put the bytes of a file in the output of the Job and use the additional_stores to save those data in another Store. Which we think is not the right way of dealing with big files.

This is a good point, but let me pose a question. Do you think it would be possible to modify the existing maggma Store classes so that the dictionary paradigm is only used for file metadata, while requests for the underlying data actually use GridFS or Azure, etc. directly? Similar to the concept of having an index store that points to the actual files, but ideally you would access everything via a single object instance. (This would not necessarily have to be done in this PR, but it has implications for how this API could look in the future).

I took a baby step in this direction in the existing FileStore, which mainly focuses on querying file metadata but does provide a rudimentary way of getting file contents (see docs). At present, those contents go into a dictionary, but it's not too hard to imagine a different method attached to the Store class that opens the file natively.

Just food for thought. If you think there is potential in having a subclass of Store that contains put and get methods (in addition to the standard update and query) so that the same basic API could be used to work directly with files, I'm happy to work with you to build a prototype.