Closed paynejd closed 4 years ago
From @rkorytkowski:
So there's a number of ways Celery can help us solve the problem of queuing tasks so that they wait for results from other tasks.
One is chaining/grouping https://docs.celeryproject.org/en/stable/reference/celery.html#celery.chain, but we need to know all tasks to be executed up-front before starting the first one.
Another one is dynamically creating queues which have just one worker. The feature is called automatic routing together with dynamic creation of queues. https://docs.celeryproject.org/en/latest/userguide/routing.html#routing-automatic We could basically route all tasks, which need to be executed one after another to a new queue defined with some unique key. Having just one worker for such a queue would guarantee they are executed one after another. The question is what should be our unique key. Do we want to have a separate queue per user or per some unique key passed with the bulk import task?
Based on our discussion today, the second option that Rafal identified (dynamic creation of queues with just one worker) would meet our requirements for queuing. It would also provide a way for us to expose an API endpoint for a client to query whether an import is underway/queued for a particular bulk import key.
Acceptance Criteria:
import_queue_key
with their bulk import request, (see example)
/manage/bulkimport/[import_queue_key]
import_queue_key
is provided, a bulk import is handled in the same way that it is currently -- it is added to the standard queue if a regular user, or to the priority queue if the root user; bulk imports in these queues may be safely processed in parallelimport_queue_key
is provided, the following occurs:
import_queue_key
already exists for the requesting userimport_queue_key
already exists, the bulk import is added to the queueimport_queue_key
does not exist, a new queue is created and the bulk import is added to itimport_queue_key
@rkorytkowski @harpatel1 Could you review this? And could you suggest API endpoints for the new requests?
All sounds good. Proposed endpoints:
GET /manage/bulkimport/
lists all tasks on all queues that the user has access toGET /manage/bulkimport/:queue/
lists all tasks on :queue
for the authenticated userGET /manage/bulkimport/?task=task_uuid
returns the task state (as it does now) with additional queue
fieldPOST /manage/bulkimport/
adds the task to a default queuePOST /manage/bulkimport/:queue/
adds the task to :queue
, which is a user-assigned mnemonicA few follow up questions for @rkorytkowski @harpatel1:
GET /manage/bulkimport/
?__standard__
and __priority__
? It would be awesome for an admin to be able to request the list of queued tasks in the standard queues using these...POST
and PUT
?This looks good.
GET /manage/bulkimport/
would list all tasks of a root user by default. We can have it return all users tasks or filter by username, but the issue is to get a task for a given username. We store username as a part of a task_id, so one way of doing this would be to first get all task_Ids and then filter task_id by username. PUT
or POST
is fine, I can't think of a reason to support both.GET /manage/bulkimport/
will list all tasks across all queues with the option to filter by usernameGET /manage/bulkimport/:queue/
will list all tasks for specified queue with the option to filter by usernamePOST
to create new import tasks@paynejd, deployed to QA. You might want to play with it. There are currently 6 workers for processing bulk imports including one which is reserved for root. Each worker processes one task at a time from a dedicated queue. If not specified, a task is added to a random queue.
@rkorytkowski This is looking good. A few questions:
While we're thinking about it-- what would be involved in adding a progress indicator for imports marked as "STARTED"? Not needed now, but probably worth defining
Great, let me know when the new fields are deployed and we can chat on slack for how to sign into flower.
Let's discuss exposing the progress via OclFlexImporter on tomorrow's dev call. Thank you!
On Wed, Jul 29, 2020 at 8:13 AM Rafal Korytkowski notifications@github.com wrote:
- Yes, you can do it via flower.
- and 3. I'll add those fields.
- A progress should be exposed by OclFlexImporter and then it's trivial to add that to the response.
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/OpenConceptLab/ocl_issues/issues/253#issuecomment-665627916, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAJCOOP5D2IWE3AECZYMUP3R6AG73ANCNFSM4OVWRWGQ .
For import progress, OclFlexImporter has an instance variable named import_results
. You can get the current progress and total lines like this:
importer = OclFlexImporter()
# set up importer and start processing here...
# print progress
print "%s of %s" % (
importer.import_results.count,
importer.import_results.total_lines)
A small change is needed in ocldev to report progress. See https://github.com/OpenConceptLab/ocldev/commit/717f2b46b8c1d4cacc56b05f15a551bbfa177304
Let's do that in a separate issue.
I'll go ahead and deploy this one to staging.
Looks like there is a bug in how the queue key and the username are parsed. See below. The username should be datim-admin
and queue should be default
GET https://api.staging.openconceptlab.org/manage/bulkimport/
[
{
"queue": "admin-default",
"username": "datim",
"state": "SUCCESS",
"task": "aee2dcd4-8416-48f2-8e6d-54d3ed31a3e0-datim-admin-default"
},
{
"queue": "admin-default",
"username": "datim",
"state": "SUCCESS",
"task": "4cde7fb0-33a3-476d-b36a-b12e168e764e-datim-admin-default"
},
{
"queue": "admin-default",
"username": "datim",
"state": "SUCCESS",
"task": "2605f513-617b-4e53-9f38-53f4a25a3bd0-datim-admin-default"
}
]
Currently two concurrent import requests is leading to an error - 1st import -
{
"status": "Success",
"imap_import_status_url": "https://test.ohie.datim.org:5000/ocl-imap/BI/status/2976905a-dd0e-4f91-bff3-832e4e6b2451-datim-admin-DATIM-MOH-BI-FY19/",
"ocl_bulk_import_task_id": "2976905a-dd0e-4f91-bff3-832e4e6b2451-datim-admin-DATIM-MOH-BI-FY19",
"country_org": "DATIM-MOH-BI-FY19",
"period": "FY19",
"ocl_bulk_import_status_url": "https://api.staging.openconceptlab.org/manage/bulkimport?task=2976905a-dd0e-4f91-bff3-832e4e6b2451-datim-admin-DATIM-MOH-BI-FY19",
"country_code": "BI",
"imap_export_url": "https://test.ohie.datim.org:5000/ocl-imap/BI/FY19/",
"country_name": "Burundi",
"message": "IMAP successfully queued for bulk import into OCL. Request IMAP export after bulk import is processed or request import status."
}
but the second import request that is made right after results in a 500 error -
{
"status": "Error",
"country_org": "DATIM-MOH-BI-FY19",
"period": "FY19",
"country_code": "BI",
"country_name": "Burundi",
"message": "500 Server Error: INTERNAL SERVER ERROR for url: https://api.staging.openconceptlab.org/manage/bulkimport/DATIM-MOH-BI-FY19/"
}
The extra information is created by python scripts in https://github.com/OpenConceptLab/ocl_datim but the requirement is we need to receive a 409 response from OCL when a second concurrent request is sent while the country code/Period is being processed
Stack trace on server: INFO 2020/07/31 13:55:31 [middlewares.py:22 process_request()] datim-admin 172.19.0.1 POST /manage/bulkimport/DATIM-MOH-BI-FY19/ ERROR 2020/07/31 13:55:31 [base.py:210 handle_uncaught_exception()] Internal Server Error: /manage/bulkimport/DATIM-MOH-BI-FY19/ Traceback (most recent call last): File "/usr/local/lib/python2.7/site-packages/django/core/handlers/base.py", line 113, in get_response response = callback(request, *callback_args, callback_kwargs) File "/usr/local/lib/python2.7/site-packages/django/views/generic/base.py", line 68, in view return self.dispatch(request, *args, *kwargs) File "/usr/local/lib/python2.7/site-packages/django/views/decorators/csrf.py", line 77, in wrapped_view return view_func(args, kwargs) File "/usr/local/lib/python2.7/site-packages/rest_framework/views.py", line 327, in dispatch response = self.handle_exception(exc) File "/usr/local/lib/python2.7/site-packages/rest_framework/views.py", line 324, in dispatch response = handler(request, *args, **kwargs) File "/code/manage/views.py", line 132, in post task = queue_bulk_import(request.body, import_queue, username, update_if_exists) File "/code/tasks.py", line 78, in queue_bulk_import task = bulk_import.apply_async((to_import, username, update_if_exists), task_id=task_id, queue=queue_id) File "/usr/local/lib/python2.7/site-packages/celery_once/tasks.py", line 85, in apply_async raise e AlreadyQueued
Thanks for testing. Addressed both issues in https://github.com/OpenConceptLab/oclapi/commit/9a1e602ca0dd48dc01434582fc6730474d9f23d1.
It's been deployed to all envs now.
Testing in the staging environment is looking great! One final request: could we add a timestamp for when the bulk import request was posted?
Sometimes bulk imports are submitted that have dependencies -- meaning, that the first one needs to have completed processing before another begins processing. How can we prevent a specific bulk import from starting until another specific import has completed processing?
For example, could I submit a bulk import with a specific key, such as
PEPFAR-MOH-Malawi-FY19
. Then, if I submit a second (or third or fourth) bulk import with the same key, then it would not begin processing until a previous bulk import in the queue has completed processing?We also want to be able to query the bulk import system to ask -- "Is there a bulk import already being processed or in the queue that matches this key?" (should be for the same user)
Does celery provide any support for this or would we need to build this custom? (or hybrid?)