Miserlou / Zappa

Serverless Python
https://blog.zappa.io/
MIT License
11.89k stars 1.2k forks source link

Make it easier to port Celery projects / queue background tasks #603

Closed michi88 closed 7 years ago

michi88 commented 7 years ago

Context

I have quite some Celery / django-q projects I would like to move to Zappa. Now one can argue that you don't need any of the tasks libs like celery / django-q as lambda's can easily be invoked on a scheduled interval or based on S3 / SNS events. True, but this doesn't mean the api's (of celery for example) to create tasks aren't handy. Think; @task() decorator and some_func.delay() to queue the task.

I mostly use celery to invoke some background task to be handled later (sending email, updating search indexes etc.). What I would like to see is that I can still use the Celery task creation api in order for my app to be still portable if I want to move away from lambda (something Zappa promises).

Possible Fix

As celery already can use SQS as its queue we maybe can add a lambda handler to SQS events and execute the task. I've been digging in the Celery codebase to search for the point where the task data (coming from the queue) is transformed and in the end executed by the worker. Because that is what we would need to do in a lambda handler. Not easy to find, it's here: https://github.com/celery/celery/blob/master/celery/worker/components.py#L229 Definitely not straightforward to get that working.

Another option would be to have a lambda 'celery worker' to run on a schedule every second (or whatever you want), have it connect to the SQS queue, pull a task (if available), execute it and exit. Far from ideal but workers can be told to only execute 1 task and exit (I'm told, haven't confirmed this yet). If that works we could also get around the 1 second schedule by having the lambda celery worker start on an SQS event but have if pull the task in a celery native fashion, instead of using the actual SQS message data when invoked.

Having said that I don't think we should try to make Zappa 'compatible' with Celery / django-q and whatever other framework there is. Celery though is a contender in my view as it's widely used.

The last option (except for not helping dev's on this topic at all) I see is creating helpers in zappa natively to replace the celery / django-q framework api's. Think;

task_id = zappa.queue.delay('module.my_func', *args, **kwargs)
# that would upload the tasks definition to for example S3
# a zappa framework native handler could execute the task based on the S3 event
# when finished the task result is uploaded back to S3 again.
task_result = zappa.queue.get_result(task_id, remove=True)  
# if finished, returns the task result and deletes the S3 task entry (if you want to), else None

If that existed I wouldn't have brought this up and I would happily write logic to use zappa vs celery based on the platform I'm running on.

One thing that does bother me with the 'zappa native' approach is that we'll be reinventing the wheel regarding task serialization, queueing, task de-serialization and executing the task. Possibly we might want to include some retry handling as well in that case.

The django-q codebase by the way is much smaller and easier to dig in. If we decide zappa should have some helpers for this it's a good starting point to look at.

Goal of this issue

I would like to start the discussion around this and collect everyone's opinion. If we come to the conclusion that Zappa should/could support something like this I'll work on a POC. If not, we can just close this issue :)

Miserlou commented 7 years ago

So, I love this idea, and this was one of the original goals of the Zappa project.

A very annoying thing that I frequently forget is that SQS isn't a Lambda event source. I don't know why. This might sink this plan.

richiverse commented 7 years ago

We can maybe hack a poor man's sqs that is just a 1 or N minute scheduled poll of sqs as a workaround

On Wed, Jan 18, 2017, 2:48 PM Rich Jones notifications@github.com wrote:

So, I love this idea, and this was one of the original goals of the Zappa project.

A very annoying thing that I frequently forget is that SQS isn't a Lambda event source. I don't know why. This might sink this plan.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/Miserlou/Zappa/issues/603#issuecomment-273581158, or mute the thread https://github.com/notifications/unsubscribe-auth/ACjB_mKOLuUM7pGTVZaehZmRdBUKc4khks5rTmyZgaJpZM4LmSoE .

guyschlider commented 7 years ago

What you guys think about the solution purposed here: https://cloudonaut.io/integrate-sqs-and-lambda-serverless-architecture-for-asynchronous-workloads/ ?

richiverse commented 7 years ago

Honestly, take a look at dynamo db streams! It's a poor mans kinesis and can have lambdas triggered

http://blogs.atlassian.com/2014/08/replayable-transactions-event-sourcing-dynamodb/

michi88 commented 7 years ago

SQS feels to involved (as it isn't an event source). I would like to look at dynamo streams more closely. Never worked with it myself.

I'm currently playing with just S3 as the event source. It feels like this might just be enough to act as a poor poor mans queue for simple async task delay mechanisms.

What I basically do is serialize a task as json (in a structured way like for example celery does), put it on a S3 'task' bucket and create a generic task_handler handler on the object created event source (s3:ObjectCreated:*) for this bucket. The task handler is responsible to map the event_type to the right function within the django app, execute it, and put the result back on S3 (and/or delete the task data from S3).

It basically has the interface described above zappa.queue.delay/get_result(...).

I'm not calling lambda functions directly as that way you can easily burst through your maximum concurrent lambdas. My assumption is that this will not happen with S3 events. At least you will get retries. Good thing also about S3 is that you have no limit on the task payload size. For example SNS alone is not an option for that reason (for me).

This all works and is relatively straightforward (which I like). I do want to see what happens when there is some load on it in terms of more event created than the concurrency limit.

michi88 commented 7 years ago

@Miserlou you mentioned this comment in the slack channel: https://github.com/Miserlou/Zappa/issues/61#issuecomment-216550083

Do I understand correctly that the idea there was to create and deploy multiple/separate lambda functions from the functions that are decorated? Wouldn't it make sense to have 1 event source and have 1 generic handler to route the event/task payload to the appropriate function in the project?

Miserlou commented 7 years ago

Not multiple Lambdas, Zappa is completely monolithic by choice. This saves on complexity and keeping warm.

I should have said SNS rather than SQS, I always forget that SQS isn't a real event source.

@zappa
def make_pie(event, context):
    ingredients = get_ingredients()
    pie = bake(ingredients)
    deliver(pie)

@flask.route('api/order/pie')
def order_pie():
    make_pie()
    return "Your pie is being made!"

Then make_pie() would actually fire an SNS event, zappa deploy would create the SNS topic and register the event source, and it would all JustWork^tm.

This is all just spitballing, of course.

richiverse commented 7 years ago

why not @zappa('fifosqs|sqs|sns|dynamo|kinesis') ? And just implement the SNS one as proof of concept?

SQS in this case would be a per minute batching of events as there is a polling hack involved

I really like this idea btw!

michi88 commented 7 years ago

What do you guys think about how to handle something like make_pie(countdown=20, eta=datetime_to_execute) as celery has? Should we?

BTW, I'm not trying to advocate that we copy the full celery interface as we want to keep it simple.

Miserlou commented 7 years ago

What does countdown, do for failure retries?

Part of my vitiation for this project is how much I hate Celery (sorry Celery devs ilu) so I'm not super interested in just copying the interface, but let's also not re-invent the wheel.

michi88 commented 7 years ago

countdown=20 would mean execute after a 20 sec delay. In celery you also have retry delay (countdown) for when a task fails. But that is definitely too much. Getting retry to work at all with DLQ's is probably the max we should go in that direction.

flux627 commented 7 years ago

@michi88 I love your S3 solution. Do you have any code to share? I agree it's a bit hacky (and perhaps too much so to be the official solution of Zappa) but maybe we can work this into a plugin of sorts, I'm willing to contribute.

Miserlou commented 7 years ago

I'm leaning strongly towards SNS over S3 for reasons we discussed in Slack, that basically this should be a very lightweight call that can be done in a single HTTP response time with no problems, with fat S3 objects, that won't be the case. It's also more indirect.

It's very simple already, just

    sns_client.publish(
        TopicArn=SNS_ARN,
        Message=str(your_object_id),
    )

Then in your Zappa settings:

        "events": [
            {
                "function": "your_module_.your_async_func",
                "event_source": {
                    "arn":  YOUR_SNS_ARN,
                    "events": [
                        "sns:Publish"
                    ]
                }
            }
        ]
michi88 commented 7 years ago

@flux627 the code I was working on strongly coupled to the app I tried it on and was only a POC to myself :). I would like to make something generic which is why I started this issue. Unfortunately I will not have time / start on this for this the coming 2 weeks. So if anyone is eager please go ahead...

geeknam commented 7 years ago

I'm donating some code that I'm using to achieve this: https://gist.github.com/geeknam/e5b4adf0a955748487f383cbe21211bd

The task decorator is inspired by Celery :)

guyschlider commented 7 years ago

@geeknam - cool, my guess is that supporting task response would be a problem with SNS. right?

flux627 commented 7 years ago

@geeknam Thank you for this! I'm sorry if it's obvious to everyone else, but can you give a little more details on how to use / integrate this into a project?

geeknam commented 7 years ago

@flux627

  1. Create SNS topic (I do this through Cloudformation), obtain the SNS topic arn (used in step 4)
  2. Put my snippet in async.py in your zappa project.
  3. Anywhere in your codebase, create a function:
from async import task

@task()
def time_consuming_task(*args, **kwargs):
    dosomethingforlessthanfiveminutes()
  1. Add SNS as event source in zappa_settings.json:
        "events": [
            {
                "function": "async.route_task",
                "event_source": {
                    "arn":  "arn:aws:sns:{region}:{account_id}:{topic_name}",
                    "events": [
                        "sns:Publish"
                    ]
                }
            }
        ]
  2. Add env var to Zappa:

SNS_ARN=arn:aws:sns:{region}:{account_id}:{topic_name}

  1. Invoke your task asynchronously (you might want to do this within request-response cycle of API Gateway):
time_consuming_task.delay()
  1. zappa update dev

  2. Profit

geeknam commented 7 years ago

I'm planning to contribute this to zappa:

  1. Add extra zappa settings: "async_tasks": true
  2. This automatically creates SNS topic and subscribes the current Lambda
  3. SNS messages will hit the task_router

Of course when I get some free time :)

mcrowson commented 7 years ago

I think the SNS is overkill. You get the same retry with async lambda calls. I don't see what value SNS has over Async lambda calls for this.

aehlke commented 7 years ago

I think the SNS is overkill. You get the same retry with async lambda calls. I don't see what value SNS has over Async lambda calls for this.

How do you make an async lambda call from within a zappa lambda function?

edit: https://boto3.readthedocs.io/en/latest/reference/services/lambda.html#Lambda.Client.invoke InvocationType=Event

Miserlou commented 7 years ago

This is a good question, but I think there are tangible benefits.

SNS is rate-limit safe. SNS provides stored delivery receipts. SNS has a configurable retry policy.

aehlke commented 7 years ago

Do lambda function invocations via InvocationType=Event not include those same or similar benefits?

Miserlou commented 7 years ago

That's a good question. I don't know if it's limit safe or if the retry policy is configurable. My gut says no.

aehlke commented 7 years ago

Some quick research, re: retries: http://docs.aws.amazon.com/lambda/latest/dg/retries-on-errors.html

Asynchronous invocation – Asynchronous events are queued before being used to invoke the Lambda function. If AWS Lambda is unable to fully process the event, it will automatically retry the invocation twice, with delays between retries. If you have specified a Dead Letter Queue for your function, then the failed event is sent to the specified Amazon SQS queue or Amazon SNS topic.

Seems this is the "officially-intended" way to do retries - the configurable retry behavior would reside in whatever handles the SQS or SNS message resulting from a failed invocation.

Miserlou commented 7 years ago

This is now merged into master!

Read the docs here: https://github.com/Miserlou/Zappa#asynchronous-task-execution

antwan commented 7 years ago

Just continuing the discussion here, as the async part of Zappa is almost perfect, but it lacks an important feature : possibility to trigger functions with a delay.

From my readings I understood it's still dodgy to implement, mainly because SNS doesn't support delays, and SQS doesn't support lambda execution. But there have been hacks to successfully delay lambda execution using cloudwatch alarm.

My question : Is this something that have been contemplated? Any plans for Zappa, for just waiting for AWS to come with a proper solution? Maybe using AWS Step Functions and wait_using_seconds?

geeknam commented 7 years ago

So I had this crazy idea of using DynamoDB TTL to expire an item which then triggers a lambda (leave delays responsiblity to DynamoDB). I gave that a try, unfortunately AWS doesn't guarantee that it expires and deletes the item at the exact same time as the specified timestamp (it only guarantees within 48h). I guess there's no other way other than a Cloudwatch Events with rate 1min.

andytwoods commented 7 years ago

Hi all, we are using this in production, to call functions after a delay / repeatedly (after a set delay): https://github.com/andytwoods/zappa-call-later/blob/master/zappa_call_later/models.py

With the idea being that a function is called periodically via scheduling https://github.com/Miserlou/Zappa#scheduling

Any thoughts :)

jordanmkoncz commented 6 years ago

Having the ability to schedule a task to be run asynchronously at a specific time is the one missing piece for me to be able to easily use Zappa in a project where I'm currently using django-q. A simple use case is where a user makes a booking (AKA an appointment) for some time in the future, e.g. 15/12/2018 at 5pm, and when this happens I want to schedule an async task to send them a reminder email for their booking 1 hour before the booking date, i.e. 15/12/2018 at 4pm. With django-q this is very easy, I just create a scheduled task which runs once and is scheduled to run at the time I want. It would be great if it was just as easy to do this sort of thing with Zappa.

prabhatpankaj commented 6 years ago

@geeknam , I am struggling to use celery in lambda function .

I had used celery + rabbitmq + django in ec2 .

our task is to move the data from temporary database table to report database table . As it will require connection in AWS VPC so external function will not work for this event .

As link provided https://gist.github.com/geeknam/e5b4adf0a955748487f383cbe21211bd

it will be very helpfull if you create sample project using zappa + django for this asynchronous task .

spyoungtech commented 6 years ago

Similar to @jordanmkoncz -- I'm trying to find an elegant way to implement dynamic scheduling for tasks to be done in the (possibly distant) future. Currently, we satisfy this with django-celery-beat using the database scheduler.

One thought is to store these future tasks with their desired approximate execution time in a database table. Then, setup a regularly executing task that will check the table for any tasks that are ready to be invoked. I believe this is essentially the idea @andytwoods shared.

Maybe a model like this

class ScheduledTask(models.Model):
    sent_on = models.DateTimeField(null=True, default=None)
    task_name = models.CharField(max_length=128) # fully qualified name
    exec_after = models.DateTimeField()
    args = ArrayField()
    kwargs = JSONField()
    # ...
    @property
    def func(self):
        f = zappa.async.import_and_get_task(self.task_name)
        return f

Then a regularly occurring task that might look something like this.

pending_tasks = ScheduledTask.objects.filter(sent_on=None, exec_after__lte=now)
for task in pending_tasks:
    zappa.async.run(task.func, task.args, task.kwargs)
    task.sent_on = now
    task.save()

As long as you don't need very granular resolution on the time, seems like it should work.

But to make this work for Flask applications as well, instead of using a django model, perhaps dynamoDB or S3 could be used to store pending tasks. Thoughts?

jiaaro commented 6 years ago

SQS is now a supported event source

https://aws.amazon.com/blogs/aws/aws-lambda-adds-amazon-simple-queue-service-to-supported-event-sources/

hm1300 commented 6 years ago

So how to achieve this feature: possibility to trigger functions with a delay, knowing SQS already available as event-sourcing ?

any ideas ?

jsharpe commented 6 years ago

AWS step functions would be an obvious way to implement the delay and then trigger the lambda. The state machine can also handle retries as well

prafulbagai708 commented 4 years ago

To not have any changes in the code, I over-ride the Celery Task class as below. Basically, I check whether I want to use Zappa or not. This also helps in local development too.

from celery import Celery, Task

from django.conf import settings

import zappa.asynchronous.run

class MyCelery(Celery):
    """ Subclass of a Celery application class that uses a custom Task type """
    task_cls = 'path.to.module:MyTask'

class MyTask(Task):

    def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
                    link=None, link_error=None, **options):

        if settings.USE_ZAPPA:
            return zappa.asynchronous.run(self, *args, **kwargs)

        return super().apply_async(self, args=None, kwargs=None, task_id=None,
                                   producer=None, link=None, link_error=None,
                                   **options)

And while initializing celery object, I do the following

from path.to.module import MyCelery as Celery

app = Celery('demo')
app.config_from_object('demo.celery')
app.autodiscover_tasks()