bluegreen-labs / ecmwfr

Interface to the public ECMWF API Web Services
https://bluegreen-labs.github.io/ecmwfr/
Other
106 stars 30 forks source link

Parallel requests #55

Closed eliocamp closed 2 years ago

eliocamp commented 4 years ago

I'm on the situation that I need to download a lot of data which I need to divide into multiple requests. Each request can take in the order of hours (depending on CDS mood that day) but I figured out that you could just send a bunch of request in parallel and let them handle the load as they see fit.

There are many ways of doing this but I went for the easy one of literally parallelising the calls like this:

download_date <- function(date, force_download = TRUE) {
  request <- # Build the request based on date

  file <- wf_request(request, 
                      user = user,
                      time_out = 5*3600,
                      path = path)
  return(file_out)
}
future::plan("multicore", workers = 5)   # CDS processes 5 requests at a time
files <- furrr::future_map(dates, download_date, force_download = FALSE)

Do you think it would be worthwhile to handle some kind of multiple requests nativelly? Essentially allowing for the request argument to be a list of request?

khufkens commented 4 years ago

Basically this already happens.wf_request() just polls the server and checks on the status to grab data using wf_transfer().

Basically you just have to make the ct$request_id an exported value. Then you cycle over this (as a list) to see which one is done and which one isn't. This way you don't need sessions running (assuming that the download time < the wait time for the batch data to show up)!

khufkens commented 4 years ago

I just checked and I thought I exported this (the request ID) previously, but it might have been tossed at some point.

khufkens commented 4 years ago

OK, it is foreseen here:

https://github.com/khufkens/ecmwfr/blob/60d2b35fc1fd1eaf92505dea5e7b9db36ae79ed3/R/wf_request.R#L187

When no transfer is initiated you exit and forward the ct object (including the request ID).

(take into account that there is a limit of 20 calls for CDS I think)

khufkens commented 4 years ago
requests <- list( request_1, request_2 )

ids <- lapply(requests, function(request){
 id <- wf_request(request, transfer = FALSE)
})

# then have a loop go over the ids list to check availability of the data
eliocamp commented 4 years ago

Oh, I didn't realise that! I'll try it out and report back :P

eliocamp commented 4 years ago

I've ben doing that and have some questions/suggestions, let me know what you think:

khufkens commented 4 years ago

Some thoughts,

Putting more info in the wf_transfer(), is easy, although checks need to make sure the wf_requests() routine doesn't fail when tinkering with it. fails might indeed need to be trapped with a try() statement.

Don't see the construction with the R6 class, could you specify this a bit more. Does this sit at the transfer or request level? An example?

eliocamp commented 4 years ago

Is the return object of request is an R6 class, then it could be easier to work with them. If the returned object is transfer, then it could contain all the information (status, id, url, etc..) as well as functions to check and download files in a more consistent manner. For example

transfer$check()

Would make the check and also update the value of transfer$status (not need to assign the returned object of wf_transfer()). Alternatively

transfer$download()

Could (potentially) check and download and then update the relevant information (including download status). Names of functions and all that is just illustrative. Furthermore, I think it would be better to create functional wrappers (e.g. wf_transfer <- function(x) x$transfer) because most R users are used to functional syntax.

The advantage is that the information in the object is updated at the same time as it checks for information, which makes it that everything is synced correctly. I think it's doable without R6 classes, but using R6 gives a natural framework.

Does this sit at the transfer or request level? An example?

I think it's natural at the transfer level instead at the request level. A request being just a list of parameters, but when sending it to the server then each transfer is its own unique object.

khufkens commented 4 years ago

One caveat I think, because of how CDS and or webapi are coded. transfer$check() will result in transfer$download(). If I'm not mistaken I found no way to just get the status of things (without actually downloading the data) if the data is available! Might be best to figure that one out first!

eliocamp commented 4 years ago

I added the ability to check for status without downloading the full dataset in this PR

khufkens commented 4 years ago

Thanks, didn't remember. If you make it work with R6 go for it I would say.

eliocamp commented 4 years ago

Great! I'll get into it :smile:

khufkens commented 4 years ago

Have fun, sorry for not being much help. Been a bit strung out of late.

eliocamp commented 4 years ago

Thanks, I will. And don't worry about, :)

eliocamp commented 4 years ago

I've implemented something.

https://github.com/eliocamp/ecmwfr/tree/R6transfer

It passes all tests and seems to be working fine. It's still is a WIP.

I'm using this branch to come up with the parallel request logic and finding it not at all trivial.

khufkens commented 4 years ago

Where do you see the stats, as it seems that your branch isn't picked up by Travis.

eliocamp commented 4 years ago

Sorry, I'm still working in my fork.

eliocamp commented 4 years ago

I think that I have now something that I'm confident in. I was a bit worried about such a big change in the internal workings of the package but I've tested it and now I'm using it to parallelise hundreds of requests and it's working ok.

The one thing I'm not entirely confident is in the "status" flag. It's kind of tricky because each different service has its own lingo and codes. Do you have the documentation for webapi and cds handy? I can't seem to find it.

khufkens commented 4 years ago

Do you have an example of this 'status' flag in use? Link to your changes, branch?

My feeling is that the API isn't too well documented because most work via they provided python interface (which is provide by the institute). This is a common issue among a lot of APIs, poor docs.

eliocamp commented 4 years ago

Here, for example: https://github.com/eliocamp/ecmwfr/blob/b5fb467f079a12a7b73518b8897d6be42f616140/R/R6_transfer.R#L162

It uses the "status" parameter to test if the request is completed, failed or not completed. That's for CDS. In the webapi case it uses the "code" parameter. I'd like to abstract away the specifics of each API and unify all so that neither the user nor the rest of the code needs to know about it.

Here's one possible example of that. An is_running() function that tests if the requests is still being processed (or in the queue) that should work regardless of the service.

khufkens commented 4 years ago

The error codes of the WebAPI are here: https://confluence.ecmwf.int/display/WEBAPI/Web+API+Troubleshooting

But before expending too much energy on the webapi I would ask when this service will deprecate. I've the feeling that CDS has gained steam and acceptance and soon the webapi will shutter anyway.

It might be good to take this into consideration, and focus mostly on CDS first.

khufkens commented 4 years ago

Ok, seems like the webapi will not be merged with CDS. Would seem the sensible thing to do given redundancy but politics probably. Anyway.

The is_running() method makes sense given that we'll have to juggle services and indeed the lingo would be consistent across services.

Should we transfer the code from your fork to a branch to do some unit testing? This won't break anything and will make tracking things easier.

eliocamp commented 4 years ago

There, I'll nuke my fork and start working in the main repo.

eliocamp commented 4 years ago

The issue now with is_runing() is that it tests whether the request has successfully completed, but returns TRUE in the case of errors.

I guess that I'm a bit confused about the various states the request can be in for each of the two services and how they are communicated by the respective APIs so I'm having a hard time trying to unify all.

khufkens commented 4 years ago

Doesn't the API return 400 statements in calls, or similar? The webapi doesn't I think, where it returns error messages which might be "grep"-able.

eliocamp commented 4 years ago

I'm not certain. Maybe I should test it out with malformed and/or impossible requests.

eliocamp commented 4 years ago

Ok, I did some sleuthing and this is what I've got so far, some of which was already in the code.

CDS:

WEBAPI:

khufkens commented 4 years ago

Thanks for the sleuthing, this should be workable I think.

I'm not sure how prone parallel requests will be to failure if one request goes down in an unclean way. Basically we need to avoid that people submit requests (which upon failure makes the whole thing break). The latter isn't technically an issue as you can still go to your dashboard online and grab the data there (if not removed from the queue explicitly).

I think the hard limit on running request for CDS is 20, I've no idea what it is for the WEBAPI. So at least one error trap should be in place for the case where people submit > 20 requests. Either give a warning, or cycle through them in batches of 20 (or xyz for WEBAPI).

eliocamp commented 4 years ago

I did the latter, but thought that the limit was 5 running request (if I submitted more, the rest were kept on queue). So I made a function that made sure that there were no more (and no less) than 5 running requests at once.

The question of failure states is also important. At present I added a "fail_is_error", but I don't think is the way to go for a robust implementation. My guess is that we should try to distinguish between permanent errors and transient ones. The latter being connectivity issues or service hiccups and the latter being malformed requests or any other problem that means that we are certain that the request will not succeed.

For connectivity issues, we could have a retry limit and maybe use curl::has_internet(). For permanent errors, we could trust the permanent flag in CDS (although I don't know what it really means) or just take any error returned by the APIs (like "failed", "rejected" and "aborted") as permanent.

eliocamp commented 4 years ago

Some travis are failing due to API rate limit exceeded. ¿Any idea on what's going on?

khufkens commented 4 years ago

There might not only be a limit on concurrent calls but also on how many you can make. I have this at times when I pushed many patches (generating tons of traffic as CI runs through the tests).

eliocamp commented 4 years ago

Ok. Normal pains of testing APIs, then. I'll restart the jobs and give them another chance. Tests are passing on my machine with my user.

eliocamp commented 3 years ago

Hi again! It's been *checks clock* more than an year! 😱 since I tinkered with this and there were a lot of commits to master in the meantime. But I sat down and basically re-implemented the R6 stuff from the latest commit. This now lives here: https://github.com/bluegreen-labs/ecmwfr/tree/R6-classes

This implementation is a bit better because it captures the logic of each service in its own class, which inherits the basic methods from a parent "service" class. This should make it simpler to implement and maintain each service, I think.

I'm not familiar with the ADS service, but from what I've seen of the code, the only difference is here:

https://github.com/bluegreen-labs/ecmwfr/blob/d7224358e829c20c5eb86812c865c2490d47a1d9/R/wf_request.R#L192-L211

Therefore, the ads service actually inherits from the cds service 😎. I don't have a user to run test, though.

The only pending issue is implementing wf_delete() and fixing some issues with backwards compatibility that surfaced when testing.

With this change, the wf_request() function, when transfer = FALSE returns the request object without submitting it first to the service.

With this it was very easy to create a relatively smart function that takes a list of requests (e.g. the output of lapply(request_list, function(x) wf_request(x, transfer = FALSE)) and then submits them in parallel:

https://github.com/bluegreen-labs/ecmwfr/blob/8f148b6b26f3eb5067b91751c5d648c4304a7b6a/R/batch_request.R

Let me know what you think :)

khufkens commented 3 years ago

Hi @eliocamp

Thanks for the additions.

The ADS service has unit tests in my master branch, which should run on a pull request (I think, my keys might be expired). As long as unit tests run cleanly everything should be fine.

The 'transfer = FALSE' is basically a provision to submit a query to the server which would be queued online but without the state being tracked in an R session. This behaviour allows large or long queries to be submitted and only downloaded once they finish (after several hours) and provides asynchronous downloads.

Basically you could line up a download from a laptop, to download it later to a production server (HPC). Or if jobs are larger I often start one overnight. I'm not sure this behaviour is maintained in your implementation. I think it should. This stateless query removes the need to juggle tmux / screen or nohup sessions, keeping a session alive (or computers running) just for a download spinner!

I also think the hard limit on the API is 20 parallel requests if I'm not mistaken. So workers can be set to a preset value of 20 - as default upper limit. There is no reason to limit it to a rate which is much lower I think. I should check the API docs, but it's in there somewhere.

https://github.com/bluegreen-labs/ecmwfr/blob/8f148b6b26f3eb5067b91751c5d648c4304a7b6a/R/batch_request.R#L2

I would also propose wf_batch_request() as function name. Typos will cause mess ups I fear, better to make it real obvious what function is called. wf_request() <-> wf_requests() is just too similar, I had to look twice myself.

[FYI: I'm in the middle of planning a move to Switzerland so I'm not on my A-game for a little while.]

eliocamp commented 3 years ago

w

The 'transfer = FALSE' is basically a provision to submit a query to the server which would be queued online but without the state being tracked in an R session. This behaviour allows large or long queries to be submitted and only downloaded once they finish (after several hours) and provides asynchronous downloads.

Basically you could line up a download from a laptop, to download it later to a production server (HPC). Or if jobs are larger I often start one overnight. I'm not sure this behaviour is maintained in your implementation. I think it should. This stateless query removes the need to juggle tmux / screen or nohup sessions, keeping a session alive (or computers running) just for a download spinner!

In my implementation you would need to do

request <- wf_request(query, transfer = FALSE)
request$submit()

So, it's not backwards compatible. I'll change that. I'll need to rethink the workflow of batch requests. Perhaps wf_batch_requests() could take a list of queries instead of a list of unsubmitted requests.

I also think the hard limit on the API is 20 parallel requests if I'm not mistaken. So workers can be set to a preset value of 20 - as default upper limit. There is no reason to limit it to a rate which is much lower I think. I should check the API docs, but it's in there somewhere.

Yesterday I set up 4 requests to the CDS service and it only processed 2 at a time (the others remained queued until the first two finished) so that's why I went with two. I don't know exactly how that works, but

I would also propose wf_batch_request() as function name. Typos will cause mess ups I fear, better to make it real obvious what function is called. wf_request() <-> wf_requests() is just too similar, I had to look twice myself.

Good catch.

[FYI: I'm in the middle of planning a move to Switzerland so I'm not on my A-game for a little while.]

Good luck with that!

khufkens commented 3 years ago

If it is possible to keep the current transfer behaviour that would be great. A lot of listed issues filled here are related to breaking changes between versions with respect to how requests are dealt with (renaming some of the variables etc). This is issue creeps up on people as R will update packages on CRAN but won't warn them of these changes.

I used to be able to query more than two downloads, they might have changed the rate limit. Anyway, if this is what your experience has been recently we better peg it at two.

Today I got news that I was able to fix an apartment remotely, which is an absolute win anyway and surely in COVID times. I'm still looking at a quarantine, but at least not in a faceless hotel with nothing but take-out for ten days.