ovh / celery-director

Simple and rapid framework to build workflows with Celery
https://ovh.github.io/celery-director/
BSD 3-Clause "New" or "Revised" License
534 stars 58 forks source link

Am I Misunderstanding Groups in Director? #167

Open agile-anthony opened 2 years ago

agile-anthony commented 2 years ago

I have a use case where i produce a list of file names and I want a task to process each filename in parallel. So I have task that returns the list of filenames, and the next step is a group consisting of one task. However, Director passes the entire list to the task. I assumed that Director would pass each instance from the list to an instance of the task, but that is not how it is working.

Here is my workflow:

poc.ACCEPT-CATALOGS:
  tasks:
    - LIST-CATALOGS
    - GROUP_CATALOGS:
        type: group
        tasks:
          - CREATE-DATASOURCE
  schema: catalogs

And here are my tasks:

  1. LIST-CATALOGS: Retrieves a list of catalogs recently stored in a certain S3 bucket. The as_dict function just converts the S3 Object into a dictionary.
@task(name="LIST-CATALOGS")
def list_catalogs(*args, **kwargs) -> List[Dict]:
    domain = kwargs["payload"]["domain"]
    catalogs = client.list_objects(bucket_name='*********', prefix=domain,
                                   recursive=False, include_user_meta=True)
    result = [ as_dict(c, domain) for c in catalogs ]
    return result
  1. CREATE-DATASOURCE: For each catalog (represented as a dict), pull its object contents from the S3 bucket and do stuff with it. The "stuff" isn't shown here, but it explains my use case.
@task(name="CREATE-DATASOURCE")
def create_datasource(*args, **kwargs):
    catalog = args[0]   # args[0] contains the entire list and this task is called once, oh no!
    # Do stuff here using the info in a dict to pull the object contents

I've done something similar using native Celery, which is why I assumed that Director may work the same way. I defined the Celery Group Canvas like this:

job = group([ create_datasource.si(as_dict(c, domain)) for c in catalogs ])
job.apply_async()

Is there a way to create similar functionality in Celery-Director? Thank you for reading and for any helpful feedback.

ncrocfer commented 2 years ago

Unfortunately the feature you're talking about is not yet available in Celery Director. What you want is dynamic tasks, but right now Celery Director only support static workflows in YAML definitions.

So when you do:

poc.ACCEPT-CATALOGS:
  tasks:
    - LIST-CATALOGS
    - GROUP_CATALOGS:
        type: group
        tasks:
          - CREATE-DATASOURCE

It's just 2 tasks that run one after the other (and in fact the group is not really useful in this case):

chain(LIST-CATALOGS, group(CREATE-DATASOURCE))

It's different from your code when you dynamically add task in the group using a list comprehension.

agile-anthony commented 2 years ago

Thank you for the explanation. I like celery-director so I will keep an eye out for enhancements.