suned / pfun

Functional, composable, asynchronous, type-safe Python.
https://pfun.dev/
MIT License
151 stars 14 forks source link

SemLock is not implemented when running on AWS Lambda #84

Closed hugosenari closed 3 years ago

hugosenari commented 3 years ago

Expected Behavior

Run my code without problem since I'm not using proccess

Current Behavior

Fail with message:

[Errno 38] Function not implemented: OSError

Possible Solution

Add option to disable ProccessPool or ThreadPool when isn't required

Steps to Reproduce

  1. Write some AWS lambda with using effects
  2. Try call your handler

Workaround:

For anyone with the same problem

from asyncio import get_event_loop
from concurrent.futures import ThreadPoolExecutor
from contextlib import AsyncExitStack

from pfun.effect import RuntimeEnv, CSuccess

async def call(self, r, max_threads=None):
    stack = AsyncExitStack()
    thread_executor = ThreadPoolExecutor(max_workers=max_threads)
    async with stack:
        stack.enter_context(thread_executor)
        env = RuntimeEnv(r, stack, None, thread_executor)
        effect = await self.do(env)
        if isinstance(effect, CSuccess):
            return effect.result
        if isinstance(effect.reason, Exception):
            raise effect.reason
        raise RuntimeError(effect.reason)

def run(effect, r):
    result = call(effect, r)
    return get_event_loop().run_until_complete(result)

Then change

main(event).run(env) by run(main(event), env)

Context (Environment)

Isn't a problem of pfun that it fails, but Proccess Pool isn't required either

https://stackoverflow.com/questions/34005930/multiprocessing-semlock-is-not-implemented-when-running-on-aws-lambda

suned commented 3 years ago

Interesting 🤓

I can think of number of ways to address this:

  1. Add option to disable thread/process pools
  2. Initialize thread/process pools lazily (so effects that don't need them wont trigger this issue)
  3. The thread and process pools are only required for effects created with from io or cpu bound functions. It's possible to have the pools provided as part of the environment argument. The signature of e.g from_cpu_bound_callable would then be
    def from_cpu_bound_callable(f: Callable[[R], Either[E, A]]) -> Effect[Intersection[R, HasProcessPool], E, A]: ...

    Where HasProcessPool:

    
    from typing import Protocol

class HasProcessPool(Protocol): pool: ProcessPoolExecutor



The latter solution completely moves the responsibility of managing process/thread pools to the caller of the effect, which leaves more to explain in the documentation. It does have the additional benefit that it makes it possible to share pools between effect interpretations.

I'm leaning towards 2. as a first solution, as this doesn't require changes to the api or documentation and could probably be achieved with the existing [`Resource` class](https://github.com/suned/pfun/blob/master/src/pfun/effect.pyx#L422).
suned commented 3 years ago

Just out of curiosity @hugosenari, are you using pfun in an open source project? Would love to see the context for what you're doing.

hugosenari commented 3 years ago

Hi, it isn't open source, I couldn't share repo/project here but lambda itself has nothing special/private.

from functools import reduce
from json import dumps, loads
from json.decoder import JSONDecodeError
from typing import Any, Dict, List

from pfun.effect import Success, Try, error, success, combine
from pfun.functions import identity
from aws_lambda_types.api_gw import (
    APIGWPayloadV2RequestDict as Event,
    APIGWPayloadV2ResponseDict as Response,
    APIGWPayloadV2RequestContextDict as Context,
)
from asyncrun import run

from models import DBEntity, Entity, EntiyId, EventData, from_data, from_event
from responses import (
    BadRequestError,
    InternalServerError,
    NotFoundError,
    err_response,
    OK,
)
from services import Env, find_entity_by_document_number, lambda_env, update_entity_by

def invalid_request(e):
    return error(BadRequestError(f"Invalid REQUEST {e}"))

def validate_event(event_dict: Event) -> Try[BadRequestError, EventData]:
    return from_event(event_dict).recover(invalid_request)

def validate_json(event: EventData) -> Try[BadRequestError, Dict[str, Any]]:
    try:
        return success(loads(event.body))
    except JSONDecodeError:
        return error(BadRequestError("Invalid JSON"))

def invalid_json(e):
    return error(BadRequestError(f"Invalid JSON {e}"))

def validate_data(payload: Dict[str, Any]) -> Try[BadRequestError, Entity]:
    return from_data(payload).recover(invalid_json)

def not_found(document_number):
    return lambda e: error(NotFoundError(f"Entity {document_number} not found"))

def find_entity(event: EventData, entity: Entity) -> Try[NotFoundError, DBEntity]:
    document_number = event.pathParameters.documentNumber
    return find_entity_by_document_number(document_number).recover(not_found(document_number))

def all(sequence: List[Try[InternalServerError, DBEntity]]) -> Try[InternalServerError, List[DBEntity]]:
    return reduce(
        lambda last, item: last.and_then(
            lambda results: item.map(lambda value: [*results, value])
        ),
        sequence,
        success([])
    )

def internal_error(e):
    return error(InternalServerError("Unexpected Error"))

def update_entity(entity_update: Entity, entities: List[Dict[str, Any]]) -> Try[InternalServerError, List[DBEntity]]:
    sequence = [
        update_entity_by(EntiyId(entity["id"]), entity_update)
        for entity in entities]
    return all(sequence).recover(internal_error)

def when(effectA, effectB, fn):
    return combine(effectA, effectB)(fn).and_then(identity).memoize()

def error_handler(err):
    return success(err_response(err))

def success_handler(_):
    return success(OK())

def main(event: Event) -> Success[Response]:
    event_is_valid = validate_event(event).memoize()
    entity_is_valid = (
        event_is_valid.and_then(validate_json).and_then(validate_data).memoize()
    )
    entity_exists = when(event_is_valid, entity_is_valid, find_entity)
    entities_updated = when(entity_is_valid, entity_exists, update_entity)
    return entities_updated.and_then(success_handler).recover(error_handler)

def handler(event: Event, context: Context) -> Response:
    return run(main(event), lambda_env())

if __name__ == "__main__":
    event: Event = {
        "pathParameters": {"documentNumber": "22953425624"},
        "body": dumps({"phone": "+5511987654321", "email": "h@spammmmm.com.br"}),
        "headers": {"some": "header"}
    }

    class FakenamoDB:
        def scan(self, **kwds):
            return {"Items": [{"id": "aaaaaaaaaaa", "name": "bal bla bla bla"}]}

        def update_item(self, **kwds):
            print("finge que atualizou o item, ", kwds)
            return {}

    env = Env()
    env.entity_table = FakenamoDB()
    result = run(main(event), env)
    print(result)

Any feedback of misused feature or improvement to this code would be welcomed. (ie I'm not sure if 'when' and 'all' are builtin feature recreated) ;-)

Some random/specific feedback of my experience using pfun with aws lambda:

hugosenari commented 3 years ago

I can think of number of ways to address this...

I liked the third option, and think it makes a lot of sense add a 'effect'/'dependency' if we need something as ThreadPool/ProccessPool.

But is usability x cohesion and you're the one who could make this decision, any solution would be welcomed, even none since is too specific to this platform.

That was the my first aws lambda using pfun, someday I could try create open source framework at top of pfun for reusability, something like LaconiaJS.

suned commented 3 years ago

Very interesting! I'm super interested in all feedback about using pfun in a real world/production setting.

Any feedback of misused feature or improvement to this code would be welcomed

all is what would be called sequence in haskell mtl terminology. Closest thing in the api right now would be effect.sequence_async, but there should probably be a non parallel version as well.

when looks like zipWith in zio which is pretty much combine. Allthough I'm not sure what the purpose of the .and_then(identity) part is, since for any effect e: e.and_then(identity) == e (The right identity monad law).

Also, validate_json could be implemented as:

def validate_json(event: EventData) -> Try[BadRequestError, Dict[str, Any]]:
    return effect.catch(JSONDecodeError)(loads)(event).recover(lambda _: error(BadRequestError(...))

Although looking at it now, I'm not sure that's better 🤓

I liked the third option, and think it makes a lot of sense add a 'effect'/'dependency' if we need something as ThreadPool/ProccessPool.

I definitely like that it makes the process/thread pool completely injectable by the user, which gives full control of life cycle to the user. It also makes the api more complicated, so I'd like to think about those tradeoffs a little harder. Version 0.12.2 implements the second solution, so you can try that out when you have the opportunity.

That was the my first aws lambda using pfun, someday I could try create open source framework at top of pfun for reusability, something like LaconiaJS.

That would be awesome, I've had similar ideas for frameworks integrating pfun with eg fastapi. Let me know how it turns out!

Annoying with the cryptography problem, I think poetry has to solve that problem upstream though.

hugosenari commented 3 years ago

I'm was afraid of sequence_async after this bug, so did it sync

Allthough I'm not sure what the purpose of the .and_then(identity) part is, since for any effect e: e.and_then(identity) == e (The right identity monad law).

How should I control error/success flow? Ie:

combine(success(2), success(2))(
    lambda a, b: error(Exception('Invalid combination'))
).and_then(
    # called with s as Error[Exception]
    lambda s: print('Do something only with valid combination') or s
).run(None)
combine(success(2), success(2))(
    lambda a, b: error(Exception('Invalid combination'))
).and_then(
    identity  
).and_then(
    # never called
    lambda s: print('Do something only with valid combination')
).run(None)
suned commented 3 years ago

Ah I see now, you need a version of combine that takes a function that returns an effect. Thats not part of the api atm, so that's definetaly a way to do it.