Miserlou / Zappa

Serverless Python
https://blog.zappa.io/
MIT License
11.89k stars 1.21k forks source link

from zappa.concurrent.futures import LambdaPoolExecutor? #1024

Open olirice opened 6 years ago

olirice commented 6 years ago

Feature Proposal Implement LambdaPoolExecutor with a similar api to ThreadPoolExecutor and ProcessPoolExecutor?

i.e.

from concurrent.futures import as_completed # pip install futures on py2.7
from zappa.concurrent.futures import LambdaPoolExecutor

# Pushes current working directory to Lambda
executor = LambdaPoolExecutor(max_workers=15)

# Function to execute in Lambda function
def do_stuff(x: int) -> int:
    return 1

futures = []
for val in range(100):
    # Submit work to Lambda function
    # Store python future (non-blocking)
    future = executor.submit(do_stuff, val)
    futures.append(future)

# As each task completes
for output in as_completed(futures):
    # Collect result from futures object
    result = output.result()

    # do more stuff with result

Thoughts:

  1. Initialization of the executor would package up the project and ship it to Lambda (if not exists).
  2. Submitting work to the executor pickles the variable x, copies it up to S3 and notifies a handler in Lambda
  3. The handler downloads, unpickles, and passes the variables to the function
  4. The client (LambdaPoolExecutor) repeatedly checks S3 for a response to see if the work is done
  5. When the work is done, copy the pickled response from S3, unpickle, and set the result to the futures object

But why? Zappa for cluster computing!

Possible hang ups

All thoughts welcome

If the response is positive, I'll take a swing at implementing it.

richiverse commented 6 years ago

http://pywren.io/

olirice commented 6 years ago

Cool, thanks!

Miserlou commented 6 years ago

Reopening so that we can actually address this request properly. @richiverse although it's okay to let people be aware of other options, don't use that as a way to shut down discussion of this project. I know this is a pattern that a lot of people are interested in having so it's worth serious consideration for the project.

I like the interface, I suggest we make it "ZappaPoolExecutor", as part of the work for the next year will be adding support for non-AWS offerings.

The thing that gets me is the polling of S3 to check for results, this seems inelegant but I don't immediately see a better solution.

olirice commented 6 years ago

ZappaPoolExecutor works for me if the project is moving to multiple back ends in the future.

The S3 as an intermediary turned out not to be necessary.

The biggest hangup to clean implementation is that there's quite a lot of logic and parsing in the CLI that would need to be duplicated in initializer for ZappaPoolExecutor or factored out somewhere they can share.

Here is an appalling implementation that wraps the CLI (and is full of bugs) I wrote a couple weeks ago to test the concept. An advantage of the zappa approach is that you can pass/call/use locally defined classes because the whole project is available on client and Lambda function.

try:
    import future
except:
    pass

import json
import pickle
from functools import partial
from concurrent.futures import _base, Future, ThreadPoolExecutor
from zappa.cli import ZappaCLI

def execute_on_lambda(lambda_client, function_name, task):
    """Executes work on a lambda instance"""
    response = lambda_client.invoke(
        FunctionName=function_name,
        InvocationType='RequestResponse',
        LogType='Tail',
        Payload=json.dumps({
            'command': 'lambdapoolexecutor._base.perform_work',
            'task': pickle.dumps(task)
        })
    )

    result = pickle.loads(json.loads(response['Payload'].read()))

    return result

def perform_work(content):
    """For use with LambdaPoolExecutor

    Unpickles and executes fully applied (functools.partial) functions
    """
    # Pickled Fully applied function 
    fn_applied_pickled = content['task']

    # Fully applied function
    fn_applied = pickle.loads(fn_applied_pickled)

    # Result for the work
    result = fn_applied()

    # Pickled result for transport
    pickled_result = pickle.dumps(result)

    return pickled_result 

class LambdaPoolExecutor(_base.Executor):
    """Async execution of function calls on AWS Lambda"""

    def __init__(self, stage, max_workers=None, update=True, undeploy_on_del=False):
        """Initializes a new LambdaPoolExecutor instance.
        Args:
            stage: A stage name defined in zappa_settings.py 

            max_workers: The maximum number of Lambda functions that can be used to
                execute the given calls. If None or not given then calls to Lambda
                functions will not be throttled.

            update: If True, update the Lambda functions on initalization
        """
        self._undeploy_on_del = undeploy_on_del

        if max_workers is None:
            self._max_workers = 1000
        else:
            if max_workers <= 0:
                raise ValueError("max_workers must be greater than 0")
            self._max_workers = max_workers

        # Initalize zappa cli
        self._zappacli = ZappaCLI()

        # TODO(or): Error handling
        # TODO(or): Don't use CLI
        if update:
            try:
                self._zappacli.handle(argv = ['deploy', stage])#, '-q'])
            except:
                self._zappacli.handle(argv = ['update', stage])#, '-q'])

        # ThreadPoolExecutor to return futures immedaitely
        self._tpe = ThreadPoolExecutor(max_workers=2)

    def submit(self, fn, *args, **kwargs):
        # Function to invoke
        fn_applied = partial(fn, *args, **kwargs)

        f = self._tpe.submit(execute_on_lambda,
                             self._zappacli.zappa.lambda_client,
                             self._zappacli.lambda_name,
                             fn_applied)

        return f    
    submit.__doc__ = _base.Executor.submit.__doc__

    def shutdown(self):
        if self._undeploy_on_del:
            self._zappacli.handle(argv = ['undeploy', stage, '-q'])

    shutdown.__doc__ = _base.Executor.shutdown.__doc__
richiverse commented 6 years ago

I was intending to inform, not close the ticket. 1 potential thing you can do with s3 is run a lambda to respond when the s3 payload is complete or has errors to avoid polling.

On Sat, Aug 19, 2017, 6:39 PM Oliver Rice notifications@github.com wrote:

ZappaPoolExecutor works for me if the project is moving to multiple back ends in the future.

The S3 as an intermediary turned out not to be necessary.

The biggest hangup to clean implementation is that there's quite a lot of logic and parsing in the CLI that would need to be duplicated in initializer for ZappaPoolExecutor or factored out somewhere they can share.

Here is an appalling implementation that wraps the CLI (and is full of bugs) I wrote a couple weeks ago to test the concept. An advantage of the zappa approach is that you can pass/call/use locally defined classes because the whole project is available on client and Lambda function.

try: import futureexcept: pass import jsonimport picklefrom functools import partialfrom concurrent.futures import _base, Future, ThreadPoolExecutorfrom zappa.cli import ZappaCLI def execute_on_lambda(lambda_client, function_name, task): """Executes work on a lambda instance""" response = lambda_client.invoke( FunctionName=function_name, InvocationType='RequestResponse', LogType='Tail', Payload=json.dumps({ 'command': 'lambdapoolexecutor._base.perform_work', 'task': pickle.dumps(task) }) )

result = pickle.loads(json.loads(response['Payload'].read()))

return result

def perform_work(content): """For use with LambdaPoolExecutor Unpickles and executes fully applied (functools.partial) functions """

Pickled Fully applied function

fn_applied_pickled = content['task']

# Fully applied function
fn_applied = pickle.loads(fn_applied_pickled)

# Result for the work
result = fn_applied()

# Pickled result for transport
pickled_result = pickle.dumps(result)

return pickled_result

class LambdaPoolExecutor(_base.Executor): """Async execution of function calls on AWS Lambda"""

def __init__(self, stage, max_workers=None, update=True, undeploy_on_del=False):
    """Initializes a new LambdaPoolExecutor instance.        Args:            stage: A stage name defined in zappa_settings.py                         max_workers: The maximum number of Lambda functions that can be used to                execute the given calls. If None or not given then calls to Lambda                functions will not be throttled.                            update: If True, update the Lambda functions on initalization        """
    self._undeploy_on_del = undeploy_on_del

    if max_workers is None:
        self._max_workers = 1000
    else:
        if max_workers <= 0:
            raise ValueError("max_workers must be greater than 0")
        self._max_workers = max_workers

    # Initalize zappa cli
    self._zappacli = ZappaCLI()

    # TODO(or): Error handling
    # TODO(or): Don't use CLI
    if update:
        try:
            self._zappacli.handle(argv = ['deploy', stage])#, '-q'])
        except:
            self._zappacli.handle(argv = ['update', stage])#, '-q'])

    # ThreadPoolExecutor to return futures immedaitely
    self._tpe = ThreadPoolExecutor(max_workers=2)

def submit(self, fn, *args, **kwargs):
    # Function to invoke
    fn_applied = partial(fn, *args, **kwargs)

    f = self._tpe.submit(execute_on_lambda,
                         self._zappacli.zappa.lambda_client,
                         self._zappacli.lambda_name,
                         fn_applied)

    return f
submit.__doc__ = _base.Executor.submit.__doc__

def shutdown(self):
    if self._undeploy_on_del:
        self._zappacli.handle(argv = ['undeploy', stage, '-q'])

shutdown.__doc__ = _base.Executor.shutdown.__doc__

— You are receiving this because you were mentioned.

Reply to this email directly, view it on GitHub https://github.com/Miserlou/Zappa/issues/1024#issuecomment-323551706, or mute the thread https://github.com/notifications/unsubscribe-auth/ACjB_kpk6qX0RJyxubxbHb0fPVdbSfNMks5sZ2QngaJpZM4Ont8I .

themmes commented 4 years ago

Very interesting issue! I was just looking into combining Zappa and Pywren, however Pywren seems to be intended to be triggered from local. Would love to have this functionality in Zappa.

So far I solved these cases creating a number of asynchronous.run to run tasks, but this obviously has its limits (e.g. only provide JSON serialisable arguments).

rivershah commented 3 years ago

I'd be very interested in this. Is there any chance AWSBatchExecutor can be included in zappa? I looked at pywren first and looked at their usage of lambda executors and standalone executors on AWS. Unfortunately I don't have the necessary dev skills to write AWSBatchExecutor. Is this something anyone from the zappa community could take on.

The use case is that with AWSBatchExecutor one can easily write highly scalable code without having to worry about resource provisioning or life cycle management of aws resources. Ideally we

job_def = get_aws_batch_job_def()
executor = AWSBatchExecutor(job_def, max_workers=10000)
results = executor.map(func, iterable)
results = list(results)