python-streamz / streamz

Real-time stream processing for python
https://streamz.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
1.24k stars 149 forks source link

Fix Stream subclassing #443

Closed florentbr closed 2 years ago

florentbr commented 2 years ago

This PR fixes the inheritance of the Stream nodes created during the chaining. It creates a new type which inherit the type of the current instance when a registered method is called. This is to support subclassing to create or override methods without altering the original Stream class.

This example creates a stream with a new foo method :

import streamz

class foo(streamz.Sink):

    def __init__(self, upstream, **kwargs):
        print('foo init')
        super().__init__(upstream, **kwargs)

    def update(self, x, who=None, metadata=None):
        print('foo update', x)

class MyStream(streamz.Stream) :

    def foo(self, *args, **kwargs) :
        return foo(self, *args, **kwargs)

s = MyStream()
s.map(lambda x : x + 25).foo()
s.map(lambda x : x + 45).sink(print)
s.emit(100)
martindurant commented 2 years ago

Cool, thank you. Could you please create a test showing this in action? It would also be nice to provide some documentation to people facing this same problem. In principle, I'm happy to be able to support register_api only on subclasses.

martindurant commented 2 years ago

uh-oh

1003 failed, 46 passed
    @functools.wraps(func)
    def wrapped(self, *args, **kwargs):
>       fn = type(name, (type(self), func), dict(func.__dict__))
E       TypeError: Cannot create a consistent method resolution
E       order (MRO) for bases Stream, map

streamz/core.py:161: TypeError
florentbr commented 2 years ago

Cool, thank you. Could you please create a test showing this in action? It would also be nice to provide some documentation to people facing this same problem.

Where do you want them ?

In principle, I'm happy to be able to support register_api only on subclasses.

What do you mean by "only on subclasses" ?

1003 failed, 46 passed

It should be better once I exclude the static register_api from the subclassing.

martindurant commented 2 years ago

In principle, I'm happy to be able to support register_api only on subclasses.

What do you mean by "only on subclasses" ?

This is exactly what you want to do, right? Be able to include methods on subclasses of Stream without affecting the original Stream.

florentbr commented 2 years ago

I think I've identified all the shortcomings.

Some of the Stream classes in this project have the same methods as the nodes. It's an issue for this PR since the inheritance overrides the methods on the nodes. It's the case with streamz.Source where Source.next overrides Stream.next.

For instance this :

import streamz
class MyStream(streamz.Source) : pass
class map(streamz.core.map, MyStream): pass
map.start, map.mro()

gives :

(<function streamz.sources.Source.start(self)>,
 [__main__.map,
  streamz.core.map,
  __main__.MyStream,
  streamz.sources.Source,
  streamz.core.Stream,
  streamz.core.APIRegisterMixin,
  object])

It fails since map.start is expected to return <function streamz.core.Stream.start(self)> and not <function streamz.sources.Source.start(self)>.

It's never going to play well with the current implementation since there's no obvious way to differentiate and define the precedence on the sub-classing. It would require all the Stream implementations to work as either a root stream or node stream.

Closing this PR these reasons. I'll open a new one to at least add a constructor for the nodes on the Stream so that the node sub-classing can be implemented at the developer discretion.