spulec / PyQS

Python task-queues for Amazon SQS
MIT License
174 stars 36 forks source link

Add support for arbitrary SQS message #69

Open januszm opened 4 years ago

januszm commented 4 years ago

Hi. I came across this project because I am looking for a solution that will allow to run an execution pool (workers) that can process tasks from any AWS SQS queues with little effort. I mean queues that were not created by the application but are already present in AWS. These are e.g. various types of notifications from AWS systems, such as the creation of a file in S3. I tried Celery, but support for SQS in Celery comes down only to publishing and processing its own messages. I was hoping that pyqs would be able to process any message, but here the situation looks similar to Celery, we have:

def decode_message(message):
    message_body = message['Body']
    json_body = json.loads(message_body)
    if 'task' in message_body:
        return json_body
    # elif ... <<< here, or extract this part into its own method, like 'detect_message_type(message)'
    else:
        # Fallback to processing celery messages
        return decode_celery_message(json_body['body'])

Have you considered this possibility? This could work using the configuration option, or using a part of message that uniquely identifies Celery (such as, I think, "task" identifies pyqs in the code above).

spulec commented 4 years ago

I'm open to the idea; it seems reasonable.

If someone wants to push on this (including yourself), I would suggest proposing what a solution might look like before anyone implements it. I would want to make sure whatever it looks like isn't going to add a lot of complexity to the library.

tvallois commented 4 years ago

I was wondering how to implement this. First, i think it's quite "easy" to know if the message is a Celery one looking at Celery's message specifications (https://docs.celeryproject.org/en/latest/internals/protocol.html#definition) Second, I don't know how to manage the module import and the task execution here:

        task_module = importlib.import_module(task_path)

        task = getattr(task_module, task_name)
        ...
        task(*args, **kwargs)

Maybe we could add a config object which create a relation between the task and the queue name... A bit similar to Celery object.

spulec commented 4 years ago

Yeah, that makes sense. I could imagine some type of more generic registry where we hook up task processors with queue names.

januszm commented 3 years ago

It's been more than a year, but I think I'll finally be able to work on it. It seems that in addition to the changes to decode_message, we also need to change this function, because it expects the presence of the 'task' key in the message body, which is quite a special case (i.e. the message from S3 will not contain it).

def _create_pre_process_context(self, packed_message):
  message = packed_message['message']
  message_body = decode_message(message)
  full_task_path = message_body['task']

  pre_process_context = {
    "task_name": full_task_path.split(".")[-1],
    "full_task_path": full_task_path,
    # ...
  }

It seems to me that this part of the code around decode_message / _create_pre_process_context should be refactored so as to allow for "wiring" of queues and Processor classes or functions with a simple configuration file, e.g.

from processors import DocumentProcessor, process_file_upload

wiring = {
  'myproject-production-documents': DocumentProcessor, # need some convention, e.g. require that the class has a `process` method defined
  'external-fileuploads': process_file_upload, # just something callable, like a function
}

In a nutshell, the goal is to reuse the existing "infrastructure" and add a universal system for connecting queues with any code to process any message, properly separate the worker management and message parsing and deleting code from processing the message content.

januszm commented 3 years ago

For starters I'm going to define my own worker class as a subclass of SimpleProcessWorker and simply override the process_message method.

januszm commented 3 years ago

I think it'd also help to slightly change the current definition of the Manager Worker classes to avoid hardcoding the processor class name, which would make it easier, or possible at all to override, see:

def __init__(
  # ...
  self.process_worker = SimpleProcessWorker # allow override this after init

def _initialize_worker_children(self, number):
  ...
  self.worker_children.append(
-   SimpleProcessWorker(
+   self.process_worker(