Closed gpfrancis closed 8 months ago
What do we do with the futures
object? Can someone walk me through the code again. Does the code just collect them and wait at the end for them all to return completion? How does that differ from synchronous loading? Is it just that we clean up at the end, and that this is more efficient than waiting for each query to complete one by one?
Yes, the main thing you're going to do with a future is call result() which waits for the task to complete if it hasn't already done so and then either returns the return value (if there is one), or raises an exception if an exception was raised or if the task was cancelled.
The advantage of this is that we can do multiple operations in parallel and we don't have to wait on each one completing before moving on. In my initial version of the optimised code - https://github.com/lsst-uk/lasair-lsst/blob/feature/ingest_optimise/pipeline/ingest/ingest_opt.py - for each mini-batch of 10 alerts we're doing 20 calls to cassandra to store cutouts (2 per alert), then three to store lightcurve information (each one with multiple CQL statements), then we deal with producing to Kafka and finally we wait for all of those cassandra calls to finish if they haven't done so.
Yes - understand better now having looked through the code. My main issue was that presumably the Cassandra query queue is not infinite in size and will fill up eventually. I've no idea yet how to tune it or if it needs to be tuned. Not sure what we do also if for some reason the futures.result() method raises an exception, though this is perhaps over-thinking it.
Bottom line is that we DO wait for all the queries to complete, but we let Cassandra handle the multiple queries in batches, rather than waiting for each one to complete individually.
gkdbutils-0.1.1 is now uploaded to PyPI. Let me know if you see any problems.
That works now, thanks.
Test is failing with the following error:
Please can you add ExecuteLoadAsync to https://github.com/genghisken/gkdbutils/blob/master/gkdbutils/ingesters/cassandra/__init__.py then bump the version number and anything else that's necessary to get a new version on to PyPI.