lempiji / rx

Reactive Extensions for D Programming Language
MIT License
53 stars 8 forks source link

Filter subscribers #30

Open Robert-M-Muench opened 5 years ago

Robert-M-Muench commented 5 years ago

I have a sequence that takes (x,y) coordinates. Now I want to notify only subscribes which pass a hit-test for this coordinate. How can this be done?

All algorithms are working against the event streams but not against the subscribers.

lempiji commented 5 years ago

Thank you for using it. Is it for the grid GUI posted in the forum?

I think the problem in terms of efficiency is "How to effectively target subscribers to be notified".

As a policy, how about doing the following in doSubscribe?

1.do a hit-test in doSubscribe to create a list of subscribers to notify 2.fetch sequentially with foreach and call put to notify each

Also, as you may have already used, the following algorithm for hit-test is interesting. https://en.wikipedia.org/wiki/Z-order_curve

Robert-M-Muench commented 5 years ago

Yes, I want to use RX as the fundamental communication concept for the GUI framework and the inter app-component communication pattern.

I have a very fast hit-test function implemented. It's using an rtree.

I'm (conceptually) struggling on how to do the doSubscribe function. My understanding is, that doSubscribe is like registering my function to call with the observable (I think I need a subject). How can/Why should I do the hit-test in doSubscribe? IMO the hit-test has to be triggered on a .put (where the x,y is received) and then do the hit-test, filter the list of subscribers, and call the registered function.

So, I somehow have to derive my own subject like conditionalSubject where code can subscribe to.

Or am I missing something?

Robert-M-Muench commented 5 years ago

If I do something like:

class HitTestSubject(E) : Subject!E {
  override void put(E obj){
  }
}

How can I iterate through all subscribers inside put()? I just want to override this function and use all the rest from RX as is.

Robert-M-Muench commented 5 years ago

Further investigated how to do this. It's not possible to access the _observer range from super class as it's private not protected. Any idea how to do this?

lempiji commented 5 years ago

Subject does something a bit more complicated about managing Observers. (because to make subscribe lock-free) Currently, it is difficult to iterate over the observer by inheriting the Subject.

For now, I think it's practical to implement your own Subject (or Observer), assuming there is no increase or decrease in Observer.

The enhancements to allow access to _observers are a bit more complicated to implement, so I'll consider making a beta version.

If you have a good idea, please give me the pull request.

Robert-M-Muench commented 5 years ago

Idea: It would be nice to be able to provide a "conditional" function (delegate?) to put() which is checked in the foreach(_observers) loop. The delegate should be called with user-provided data (obj) so that all kind of checks can be done.

Deriving from Subject (or others) make sense to store all kind of run-time data. This data should be accessible by the conditional function used by put().

Robert-M-Muench commented 5 years ago

For now, I think it's practical to implement your own Subject (or Observer), assuming there is no increase or decrease in Observer.

I don't understand what you mean with "increase or decrease in Observer"? The number of observers will change during run-time.

I already tried to implement my own Subject. But it seems I need to add my implementation directly to RX.subject.d because things I need from this file are marked as private.

Robert-M-Muench commented 5 years ago

@lempiji How about a put() that takes a delegate? Like:

struct observerFilter {
  int opApply(scope int delegate(<obj, observer, etc.>) foreach_body) {
  }
}

and inside put() instead of:

foreach(observer, oberservers) {...}

we have:

foreach(observer, userProvidedFilter(observers)){...}

Then I can filter the oberservers as I like.

As a simple interface I can imagine something like:

mysubject.put(myObj, myDelegate);
lempiji commented 5 years ago

@Robert-M-Muench I added some accessors and wrote a unit test.

However, having to write if andcast first is somewhat unfriendly.

I'm thinking of simply adding a method to SubjectObject, but in many cases it's likely to be a useless method, so I gave up the implementation.

After that, I thought about creating a signature method that I sought by inheriting SubjectObject, but I think that it is not a good way to add classes for each type such as AsyncSubject.

Do you think this is enough?

Robert-M-Muench commented 5 years ago
  1. Why do you use .put(sub, -1) and not sub.put(-1)? Any reason behind this?
  2. My idea was that there is a special subject.put(obj, filter-function) which calls filter-function to decide if an observer's put(obj) should be called or not. filter-function would get (subject.obj, observer.obj, observers)
  3. An alternative to 2. could be that subject.put() call a filter-function which returns an array/range/etc. of observers for which observer.put() is called. This makes it even more flexible.

Overall I think that the filter-function should be run-time provided and not a fixed implementation in the class hierarchy. More like a plug-in which can change during run-time.

Robert-M-Muench commented 5 years ago

I have another idea: Why not use a reactive stream "filter-observers" which subject provides, and I can subscribe to with whatever function I want. Then subject checks if there is a subscriber, if, it calls the subscriber with the necessary information (maybe a struct?) and gets back a list of observers to notify.

Or a call where I can submit a list of observers to notify and subject keeps the list.

lempiji commented 5 years ago

1: I use this format because Observer was originally defined as OutputRange and this is the correct use. See isOutputRange in std.range.

I have failed several times in the past, and there are some traps to solve the overload, so it's better not to use UFCS either.

By the way, you can subscribe to Observable!string with Observer!char.

About 3 years ago, I wrote a detailed article about this in Japanese, but I can't translate it into English. https://qiita.com/lempiji/items/2070719fee944e64b457

2: Since the signature of put is already defined by OutputRange, I don't want to go in the direction of extending this itself. std.digest has a similar interface, but I don't like it.

3, anonymous idea: I thought it would be possible to apply a filter function at run-time by slightly extending MyCustomSubject in unittest.

SubjectObject manages the child Observers in one CompositeObserver, rather than in a list, to make internal processing lock-free. (so that cas can exchange it for atomic,)

The hardest thing for a Subject is to actually pull out the Observer list and process it. If the Subject processes the list in the foreach itself from the retrieved Observer, it seems to reveal the role that CompositeObserver encapsulates.

However, bundling your own Observers into a single Observer is more desirable in terms of performance. Because the SubjectObject internally wraps the Observer with Observer!E, this performance degradation is minimal.

I think subscription management is hard to implement, but if the number of your own Observers doesn't change dynamically, it shouldn't be that hard.

Robert-M-Muench commented 5 years ago
  1. Ok, understood. So, .put() uses some run-time fallbacks to create the desired effect. Still learning ranges.

  2. My idea could be split into two calls. One to set the function to call and the other being the normal .put. I still like the idea to specify a filter-like function dynamically.

  3. I'm currently extending an RX fork for this idea. And did it in a way where you give a Subject the filter-function, which just forwards it to the internally used CompositeObserver. So, it's more to keep the interface logically clear, since one is working with a Subject.

In my use case GUI, the observers dynamically change a lot and fast, so efficient subscription management is critical. But, no pre-mature optimization, I want to get the whole thing up and running first.

So, how to proceed? Any conclusion now?

Robert-M-Muench commented 5 years ago

Here you can see how I did it so far: https://pastebin.com/HC7fMMVA

Robert-M-Muench commented 5 years ago

How about this: FilteredSubject gets a _filteredObserver member and a filteredSubscribe() function. The code can be shared mostly. Some client code somehow gets the list of relevant observers and updates the _filteredObserver. If _filteredObserver contains something .put() will use this otherwise _observers.

With this client-code can subscribe at will, and some other app-part can limit which of the subscribed observers get notified.

Not sure how fast subscribing/unsubscribing is. What do you think? This would mainly keep all the logic as is, re-use a lot of code and can even be derived from Subject.

Robert-M-Muench commented 5 years ago

Please see: https://pastebin.com/GKa08SPM

After playing around I think this approach is the best and minimally invasive. Some code can manage a list of observers which are notified.

I think deriving a FilteredSubject class makes sense, where the _filteredObservers is added.

I don't fully understand all the shared(...) stuff or the use of cas(...) so not sure how to best implement things. Maybe you can take the FilteredSubject idea and do a correct implementation?

What do you think?

Robert-M-Muench commented 5 years ago

The problem I now see is if I use this approach in my app code:

mySubject.doSubscribe!((Para p) {myFunc(p);}); 

I think a special observer is created (??? which uses myFunc() as a delegate) and subscribed, meaning it goes into _observers.

Later on, I need to somehow insert this specially created observer to the _filteredObserver list, so a comparison between the entries in _observer and _filterObserver is possible.

But how do I access the thing created by doSubscribe()? I don't understand what I could compare...

lempiji commented 5 years ago

I looked at the code of HitTest. I think I understand the concept.

But keeping everything lock-free is really hard and tedious...

It will take time, so why don't you do the following for now? (It is stored in _observers without being processed internally.)

auto observer = makeObserver(&myFunc).observerObject!Para();
subject.doSubscribe(observer);
Robert-M-Muench commented 5 years ago
makeObserver(&myFunc).observerObject!Para();

How is this line working?

makeObserver normally needs at least two delegates but here only one is given, so observerObject() returns an ObserverObject that has a completed() and failure() member?

But, observerObject takes an E which I understand, and uses an R. Where is R coming from / what is R in the above case? And, the _range.completed() and _range.failure() functions would be null in our case, right? So, later on, if these are called, the makeObserver implementation checks for this null, and does nothing.

Such a detailed explanation would help a lot to understand the RX code. It's a pretty advanced code base and without knowing all the D details etc. it's very hard to understand.

Robert-M-Muench commented 5 years ago

WRT the R parameter, I'm not sure how your line cab be re-written as:

 observerObject!Para(makeObserver(&myFunc));

But here makeObserver() would be called first, which would be missing the delegates for completed and failure... or

makeObserver(observerObject!Para(&myFunc));

which returns an ObserverObject with default completed() and failure() variants. That sounds logical.

On the other hand, I don't understand how the UFCS syntax works here. Because UFCS states: "A free function can be called with a syntax that looks as if the function were a member function of its first parameter type."

But wouldn't the first parameter be: makeObserver(&myFunc)? Which leads to my first version, which is unlogical. So... I'm confused how this works.

lempiji commented 5 years ago

Oh, I was a mistake. The avobe code don't need makeObserver.

auto observer = observerObject!Para(&myFunc);
Robert-M-Muench commented 5 years ago

Ok. That makes more sense.

Anyway, I'm still struggling with the following:

  1. We have 1..N objects of type A.
  2. Each object can subscribe to 1..M streams using 1..M different observerObject!T(&myMemberFunc). So an object subscribes to different streams.
  3. I have one central HitTest which returns me the hit objects of type A.
  4. I know which of the 1..M streams the returned set of objects from step 3 should be filtered.

The problem I have is, how can I filter inside the streams based on the object reference when I only have an observerObject? Can I somehow get the object reference (parentof?) of the observerObject?

Robert-M-Muench commented 5 years ago

This is an code example from unittests:

    int putCount = 0;
    int completedCount = 0;
    int failureCount = 0;

    class TestObserver : Observer!int
    {
        void put(int n)
        {
            putCount++;
        }

        void completed()
        {
            completedCount++;
        }

        void failure(Exception e)
        {
            failureCount++;
        }
    }

    static assert(isObserver!(TestObserver, int));

    auto test = new TestObserver;
    Observer!int observer = observerObject!int(test);

How can I compare observer to test? Something like: observer == test should yield true. And the same for a member function:

    Observer!int observer = observerObject!int(&test.mymember);

How can I now compare/find-out that obeserver and test are related?

Robert-M-Muench commented 5 years ago

So, I think it's somehow necessary to use delegates here. As it's possible to access the .ptr of a delegate which gives the object to compare to.

Robert-M-Muench commented 5 years ago

@lempiji Since I don't have any email from you (would be great to get it via PM) I know tried my best to implement a ContextObserver. The idea is, that it can carry a pointer to whatever gives access to a context. And that this context pointer is used for filtering. Like only inform observers that have the same context pointer.

Anyway, I'm struggling with a couple of things (marked with @@). Maybe you can take a look and I think you get the idea. But my implementation is mostly not good nor usable.

See: https://github.com/Robert-M-Muench/rx/blob/dev/source/rx/contextobserver.d

.

Robert-M-Muench commented 5 years ago

After a long time still struggling...

Looking at your MyFilterSubject example in subject.d I don't have a clue how I can get rid of the foreach loop? The whole RX lib seems to assume that observers are iterable with a foreach loop.

What I want to do is avoid looping through all observer, but query the observers to notify. So that query would return a list, which I would iterate over.

So, this requires a subscribe() function that handles the subscriptions differently. But that doesn't compile. See: https://pastebin.com/5BTT16Ze for example code.

aberba commented 4 years ago

@Robert-M-Muench there's GitHub gist for showing snippets without leaving GitHub.

Sorry I can't help with the RX thing though. Way about my brains.