evhub / coconut

Simple, elegant, Pythonic functional programming.
http://coconut-lang.org
Apache License 2.0
4.04k stars 120 forks source link

Add `async_mapreduce` using `anyio` #796

Closed evhub closed 9 months ago

evhub commented 9 months ago

Just like #795.

evhub commented 9 months ago

I think I'm not going to implement this right now, since you can get very close to this just by doing

async_mapreduce = async_map `and_then` mapreduce$ident)

where the only real difference between that and actual async_mapreduce is that it can't work on the reductions while it's waiting on the keys and values, but that's very minor and not clearly worth a separate built-in.

evhub commented 9 months ago

Putting some of the code snippets here that I'd need if I wanted to try to re-add this:

        def_async_mapreduce=async_def(
            "async_mapreduce",
            # note that we can't await reduce_func here without locking,
            #  otherwise the reduction wouldn't be atomic, so we just
            #  disallow async reduce_funcs
            async_def=r'''
async def async_mapreduce(key_value_func, iterable, **kwargs):
    """Async version of mapreduce."""
    import anyio
    collect_in = kwargs.pop("collect_in", None)
    reduce_func = kwargs.pop("reduce_func", None if collect_in is None else False)
    reduce_func_init = kwargs.pop("reduce_func_init", _coconut_sentinel)
    if reduce_func_init is not _coconut_sentinel and not reduce_func:
        raise _coconut.TypeError("reduce_func_init requires reduce_func")
    if kwargs:
        raise _coconut.TypeError("async_mapreduce() got unexpected keyword arguments " + _coconut.repr(kwargs))
    collection = collect_in if collect_in is not None else _coconut.collections.defaultdict(_coconut.list) if reduce_func is None else {}
    async def handle_item(item):
        key, val = await key_value_func(item)
        if reduce_func is None:
            collection[key].append(val)
        else:
            old_val = collection.get(key, reduce_func_init)
            if old_val is not _coconut_sentinel:
                if reduce_func is False:
                    raise _coconut.ValueError("mapreduce()/collectby() got duplicate key " + repr(key) + " with reduce_func=False")
                val = reduce_func(old_val, val)
            collection[key] = val
    async with anyio.create_task_group() as nursery:
        for item in iterable:
            nursery.start_soon(handle_item, item)
    return collection
            ''',
            no_async_def=r'''
def async_mapreduce(*args, **kwargs):
    """async_mapreduce not available on Python < 3.5"""
    raise _coconut.NameError("async_mapreduce not available on Python < 3.5")
            ''',
            needs_vars={
                "_coconut_sentinel": "_coconut_sentinel",
            },
        ),

and

# should match mapreduce above but async and no map_using
@_t.overload
def async_mapreduce(
    key_value_func: _t.Callable[[_T], _t.Awaitable[_t.Tuple[_U, _W]]],
    iterable: _t.Iterable[_T],
    *,
    map_using: _t.Callable | None = None,
) -> _t.Awaitable[_t.DefaultDict[_U, _t.List[_W]]]: ...
@_t.overload
def async_mapreduce(
    key_value_func: _t.Callable[[_T], _t.Awaitable[_t.Tuple[_U, _W]]],
    iterable: _t.Iterable[_T],
    *,
    reduce_func: _t.Callable[[_W, _W], _V],
    reduce_func_init: _W = ...,
    map_using: _t.Callable | None = None,
) -> _t.Awaitable[_t.Dict[_U, _V]]: ...
@_t.overload
def async_mapreduce(
    key_value_func: _t.Callable[[_T], _t.Awaitable[_t.Tuple[_U, _W]]],
    iterable: _t.Iterable[_T],
    *,
    reduce_func: _t.Callable[[_X, _W], _V],
    reduce_func_init: _X = ...,
    map_using: _t.Callable | None = None,
) -> _t.Awaitable[_t.Dict[_U, _V]]: ...
@_t.overload
def async_mapreduce(
    key_value_func: _t.Callable[[_U], _t.Awaitable[_t.Tuple[_t.Any, _t.Any]]],
    iterable: _t.Iterable[_U],
    *,
    collect_in: _T,
    reduce_func: _t.Callable | None | _t.Literal[False] = None,
    reduce_func_init: _t.Any = ...,
    map_using: _t.Callable | None = None,
) -> _t.Awaitable[_T]:
    """Async version of mapreduce."""
    ...