ReactiveX / RxPY

ReactiveX for Python
https://rxpy.rtfd.io
MIT License
4.81k stars 361 forks source link

Can't use partial function as a function in flat_map #51

Closed jvalentini closed 9 years ago

jvalentini commented 9 years ago

I created a StackOverflow post describing my problem.

The issue is that I have a partial function (created via functools.partial) that I want to pass to flat_map. However, doing so results in the following:

Traceback (most recent call last):
  File "retry/example.py", line 46, in <module>
    response_stream = message_stream.flat_map(functools.partial(message_handler, context=context))
  File "/home/justin/virtualenv/retry/local/lib/python2.7/site-packages/rx/linq/observable/selectmany.py", line 67, in select_many
    selector = adapt_call(selector)
  File "/home/justin/virtualenv/retry/local/lib/python2.7/site-packages/rx/internal/utils.py", line 37, in adapt_call_1
    argnames, varargs, kwargs = getargspec(func)[:3]
  File "/usr/lib/python2.7/inspect.py", line 816, in getargspec
    raise TypeError('{!r} is not a Python function'.format(func))
TypeError: <method-wrapper '__call__' of functools.partial object at 0x2ce6cb0> is not a Python function

Here is some sample code which reproduces the problem:

from __future__ import absolute_import
from rx import Observable, Observer
from pykafka import KafkaClient
from pykafka.common import OffsetType
import logging
import requests
import functools

logger = logging.basicConfig()

def puts(thing):
    print thing

def message_stream(consumer):
    def thing(observer):
        for message in consumer:
            observer.on_next(message)

    return Observable.create(thing)

def message_handler(message, context=None):
    def req():
        return requests.get('http://httpbin.org/get')

    return Observable.start(req)

def handle_response(message, response, context=None):
    consumer = context['consumer']
    producer = context['producer']
    t = 'even' if message % 2 == 0 else 'odd'
    return str(message) + ': ' + str(response) + ' - ' + t + ' | ' + str(consumer) + ' | ' + producer

consumer = ['pretend', 'these', 'are', 'kafka', 'messages']
producer = 'some producer'
context = {
    'consumer': consumer,
    'producer': producer
}
message_stream = message_stream(consumer)
response_stream = message_stream.flat_map(functools.partial(message_handler, context=context))
message_response_stream = message_stream.zip(response_stream, functools.partial(handle_response, context=context))
message_stream.subscribe(puts)
Alexhuszagh commented 9 years ago

I'll also add that in rx/internal/utils.py you have the following from line 20-37:

def adapt_call_1(func):
    """Adapts func from taking 3 params to only taking 1 or 2 params"""

    func_types = (
        types.FunctionType,
        types.MethodType,
    )
    if hasattr(func, '__call__') and not isinstance(func, func_types):
        func = func.__call__

    def func1(arg1, *_):
        return func(arg1)

    def func2(arg1, arg2=None, *_):
        return func(arg1, arg2)

    func_wrapped = func
    argnames, varargs, kwargs = getargspec(func)[:3]

    if not varargs and not kwargs:
        num_args = len(argnames)
        if hasattr(func, '__self__'):
            num_args -= 1
        if num_args == 1:
            func_wrapped = func1
        elif num_args == 2:
            func_wrapped = func2

    return func_wrapped

Meanwhile, inspect.argspec has the following:

def getargspec(func):
    """Get the names and default values of a function's arguments.

    A tuple of four things is returned: (args, varargs, varkw, defaults).
    'args' is a list of the argument names (it may contain nested lists).
    'varargs' and 'varkw' are the names of the * and ** arguments or None.
    'defaults' is an n-tuple of the default values of the last n arguments.
    """

    if ismethod(func):
        func = func.im_func
    if not isfunction(func):
        raise TypeError('{!r} is not a Python function'.format(func))
    args, varargs, varkw = getargs(func.func_code)
    return ArgSpec(args, varargs, varkw, func.func_defaults)

The problem is that this does not accept function objects, which at least in the case of partial, can easily be determined by the func.func and func.args attributes for a partial, which should work for any similar function object.

dbrattli commented 9 years ago

Could you check of adapt_call_2 works for you? It was something I wrote for using RxPY on the embedded pyboard which doesn't have the inspect module. It's a bit slower than inspect, but if it fixes your problem then we could patch it to using adapt_call_2 if adapt_call_1 failes for some reason.

dbrattli commented 9 years ago

I think this should be fixed, so I'm closing this issue. Please reopen if you stil think there's a problem.

lock[bot] commented 5 years ago

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.