eclipse-uprotocol / up-spec

uProtocol Specifications
Apache License 2.0
38 stars 26 forks source link

Does UTransport::register_listener match on source or sink address? #73

Closed sophokles73 closed 6 months ago

sophokles73 commented 9 months ago

The RegisterListener() spec does not explicitly define the (type of) address to match on.

Register a UListener to receive message(s) for a given UUri (topic).

seems to suggest that the given topic should be matched on a message's destination address (sink attribute). However, that would mean that publish messages cannot be matched because they only contain an origin address (source attribute).

sophokles73 commented 9 months ago

@PLeVasseur Any opinion on this in the context of uStreamer?

evshary commented 9 months ago

What I'm doing now in UPClientZenoh is that I use UUri to check the type.

We have is_rpc_response and is_rpc_method, so this can be done easily. Here is the logic: https://github.com/eclipse-uprotocol/up-client-zenoh-rust/pull/2/files#diff-2b11318fb2673b25403b992883ee6ac91316e6f11f3c32c1b352caf4e07c79beR562

But I'm glad to see there is a better way to do so.

PLeVasseur commented 9 months ago

It's a good point @sophokles73, we should make sure we're clear.

I think it should go like:

I guess for notification messages which do have a sink, we'd still just be listening for the source_uuri.

Any issues we can see?


BTW @evshary -- I approved the initial Zenoh PR before I got to this issue, but I'm now looking a little more closely.

I left a comment over on the PR (after it merged haha), noting that I think we should update it so that we create the Zenoh key from the source, not the sink. Can you take a look?

evshary commented 9 months ago

@PLeVasseur Yes. Thanks to @sophokles73 's PR I used UMessageBuilder::publish to fix it https://github.com/eclipse-uprotocol/up-client-zenoh-rust/pull/5/files#diff-7ff815b68943f08bef79f083c09e55ca8c18db8e25033034b12fa31f1d43dab4R205

PLeVasseur commented 9 months ago

Hey @sophokles73 -- did my proposal above make sense? Any lingering issues?

sophokles73 commented 9 months ago

@PLeVasseur I believe you are right, and here is my attempt to formalize the reasoning behind it ... and learn how to use math expressions in GitHub comments :-)

IMHO we need to consider these uEntity use cases:

  1. Service client uEntity wants to receive response messages targeted at itself => register listener matching on sink
  2. Service provider uEntity wants to receive request messages targeted at itself => register listener matching on sink
  3. Any uEntity wants to receive notifications targeted at itself => register listener matching on sink
  4. Any uEntity wants to receive publish messages that it has subscribed to => register listener matching on source

For uStreamer it seems to be a little more complicated. My understanding is, that for the time being we only consider single-hop forwarding, i.e. each transport that the streamer is connected to, is associated with a single authority only.

Let $T = \{T_i|i \in [1..n] \}$ be the set of transports that the streamer is connected to. Let $A(T_i)$ be the (single) authority used by messages originating from or destined to uEntities on transport $T_i$. Let $!A(T_i) = \{A(T_j)|j \in [1..n] \land j \ne i\}$ be the set of authorities being used by messages originating from or destined to uEntities on any transport other than $T_i$.

Then on each $T_i \in T$ the uStreamer wants to receive

  1. response messages targeted at a member of $!A(T_i)$ => register listener matching on sink
  2. notifications targeted at a member of $!A(T_i)$ => register listener matching on sink
  3. publish messages any member of $!A(T_i)$ has subscribed to => register listener matching on source

Does that make sense?

PLeVasseur commented 9 months ago

IMHO we need to consider these uEntity use cases:

  1. Service client uEntity wants to receive response messages targeted at itself => register listener matching on sink
  2. Service provider uEntity wants to receive request messages targeted at itself => register listener matching on sink
  3. Any uEntity wants to receive notifications targeted at itself => register listener matching on sink
  4. Any uEntity wants to receive publish messages that it has subscribed to => register listener matching on source

Agree that these are the use cases to support for the "typical" case. Down below I show that I think this is the same idea for the uStreamer case as well.

For uStreamer it seems to be a little more complicated. My understanding is, that for the time being we only consider single-hop forwarding, i.e. each transport that the streamer is connected to, is associated with a single authority only.

Not quite, the relationship is reversed. For each authority, we assume we need only be able to speak to it over a single transport. There will be something of a routing table / mapping, something like this:

UAuthority Transport Host Transport
192.168.1.2 Zenoh true
192.168.1.3 vsomeip false
192.168.1.4 vsomeip false
192.1968.1.5 MQTT5 false
192.1968.1.6 MQTT5 false

I'm just thinking out loud below on what we'd do for each message type:

The rest of Notification, Request, and Response message types would be very similar:

So I suppose that means its usage of register_listener() would "under the hood" do all of 1-4 that you identified above.

I think the rest of your message may not make sense to me in the context of I brought up above of the mapping being many UAuthority to one transport type.

Hoping what I wrote made sense. Let me know.


FYI @evshary -- I'm sensing we will need to modify up-client-zenoh-rust UTransport::send() to add being able to send Notification with their Zenoh KeyExpr set at their sink instead of source. I'm referring specifically to this part of the code.

Notifications are not an explicit message type, but are lumped in with Publish.

You can discriminate them based on whether there's a sink or not:

(Does this seem safe to do @sophokles73? Any other way to discriminate between them?)

I'm gonna cross-post this bottom portion as an issue over on up-client-zenoh-rust to highlight it.

sophokles73 commented 9 months ago

@PLeVasseur thanks for the explanation which makes a lot of sense to me. Some questions remain:

  1. What is the semantics of the Host Transport column. Is this relevant for routing messages? Can you elaborate a little?
  2. Does the Transport column in your routing table contain the type of transport being used or does it refer to a particular transport instance?

Regarding the second question, there seems to be a difference which might be relevant. In your example table, there are two authorities which can be reached via MQTT5. I can imagine two setups:

  1. There is a single MQTT5 broker which is shared between the two authorities. The streamer would probably use one single MQTT5 transport instance to interact with all of the authorities that are connected to the single broker. IMHO the transport would need to explicitly support this by means of including the authority in the MQTT topic names being used.
  2. Each authority has their own MQTT5 broker for exchanging messages with its uEntities. The streamer would probably use multiple transport instances, one for each authority/broker.

The former setup also seems to be the one imagined for the Zenoh transport, right? In any case, my understanding is that we will only want/need to route messages between different transport instances. For example, with the former setup (shared broker), if we receive a publish message from authority 192.168.1.5 and we forward it to the MQTT5 transport for 192.168.1.6 (which is the one used by 192.168.1.5 as well) we would end up looping the publish message endlessly, as it would be (re-)published to the very same topic on the same MQTT5 broker over and over again, or am I mistaken? In fact, after the first loop we would receive the (re-)published message on both listeners, the one for 192.168.1.5 as well as the one for 192.168.1.6, right?

PLeVasseur commented 9 months ago
  1. What is the semantics of the Host Transport column. Is this relevant for routing messages? Can you elaborate a little?

I'm still a little early into the design process here, frankly. It's a way to tell whether the messages received over that transport are "for this device" vs "for another device, one or more hops away".

  1. Does the Transport column in your routing table contain the type of transport being used or does it refer to a particular transport instance?

Refers to the type of transport used, in other words, some kind of tag which can be used for lookups. Again, a bit early in design here, but here's my current thoughts. I wrote it out as a table here for the picture, but in reality there's a hashmap from UAuthority to TransportTag and then another hashmap from TransportTag to channel::Sender. This allows us to snag the sink UUri's UAuthority from an incoming message, perform a lookup to get a TransportTag, then lookup the channel::Sender onto which we should send this message so that the thread holding that particulat UTransport instance can forward the message.

The set of TransportTags (u8s at the moment) is configurable by someone setting up a ustreamer.


Neat stuff to learn about MQTT5. I'm definitely a newbie learning about this stuff. :slightly_smiling_face:

  1. There is a single MQTT5 broker which is shared between the two authorities. The streamer would probably use one single MQTT5 transport instance to interact with all of the authorities that are connected to the single broker. IMHO the transport would need to explicitly support this by means of including the authority in the MQTT topic names being used.

So yeah, this is doable in the manner in which you say. We'd have a single dyn UTransport around if there were a single broker.

  1. Each authority has their own MQTT5 broker for exchanging messages with its uEntities. The streamer would probably use multiple transport instances, one for each authority/broker.

Yup, this also is doable. When the ustreamer instance is being configured, we'd have 2 x dyn UTransport once per broker and a different TransportTag for each. We'd also have to configure the UAuthority -> TransportTag appropriately to represent that some UAuthority represent a certain MQTT5 broker instance set of comms.


The former setup also seems to be the one imagined for the Zenoh transport, right? In any case, my understanding is that we will only want/need to route messages between different transport instances. For example, with the former setup (shared broker), if we receive a publish message from authority 192.168.1.5 and we forward it to the MQTT5 transport for 192.168.1.6 (which is the one used by 192.168.1.5 as well) we would end up looping the publish message endlessly, as it would be (re-)published to the very same topic on the same MQTT5 broker over and over again, or am I mistaken? In fact, after the first loop we would receive the (re-)published message on both listeners, the one for 192.168.1.5 as well as the one for 192.168.1.6, right?

Another really good point.

The way I handled this in a previous prototype of ustreamer was to have an LRU cache based on the message's UUID. If we find this UUID in the cache upon receipt of it off the transport, we know we have already forwarded this message once and should drop it.

Definitely open to other ideas, but this seemed reasonable at the time.

sophokles73 commented 9 months ago

I wrote it out as a table here for the picture, but in reality there's a hashmap from UAuthority to TransportTag and then another hashmap from TransportTag to channel::Sender. This allows us to snag the sink UUri's UAuthority from an incoming message, perform a lookup to get a TransportTag, then lookup the channel::Sender onto which we should send this message so that the thread holding that particulat UTransport instance can forward the message.

Ok, I guess for the unicast messages (having a sink attribute) this should work as intended. For the publish messages this will result in situations where we also forward messages to the transport that the messages were received from, right? Maybe in a first step we can live with that, in the end, these messages will most likely simply be ignored anyway if no uEntities are interested in them.

In the future, the uStreamer should probably better use the subscription info/updates provided by uSubscription to maintain a dynamic routing table for publish messages. Based on that information, the streamer can then determine, which authorities contain uEntities that have subscribed to a given publish message's source topic and then only forward publish messages to such authorities.

@PLeVasseur Are we on the same page?

PLeVasseur commented 9 months ago

Ok, I guess for the unicast messages (having a sink attribute) this should work as intended.

:+1:

For the publish messages this will result in situations where we also forward messages to the transport that the messages were received from, right?

Another good point. I suppose we could, when forwarding onto all of the transports ensure we exclude the transport the message came from.

In the future, the uStreamer should probably better use the subscription info/updates provided by uSubscription to maintain a dynamic routing table for publish messages. Based on that information, the streamer can then determine, which authorities contain uEntities that have subscribed to a given publish message's source topic and then only forward publish messages to such authorities.

This part makes sense, but it'd be good to formalize within the up-spec that this is the direction. Tagging in @stevenhartley for feedback.

@PLeVasseur Are we on the same page?

I think so :slightly_smiling_face:

sophokles73 commented 9 months ago

I suppose we could, when forwarding onto all of the transports ensure we exclude the transport the message came from.

We could do that, but this would only make sense for transports using a shared messaging infrastructure, not peer-to-peer based ones like Some/IP. So, if we have one Some/IP transport instance which handles messages from multiple MCUs like 192.168.0.1 and 192.168.0.1 192.168.0.2, then we actually should forward a publish message from 192.168.0.1 to the Some/IP transport so that 192.168.0.1 192.168.0.2 also has a chance of getting it. In this particular case, the uTransport implementation should, however, know about the specifics of the transport protocol at hand and be able to decide to which peer(s) it needs to send the message.

sophokles73 commented 9 months ago

Ok, based on the discussion so far, I guess we could now start coming up with options for how uTransport implementations could support this :-) It seems obvious that the existing Utransport::register_listener is not sufficient in the way it is currently defined ...

sophokles73 commented 9 months ago

An obvious improvement would be to add another parameter to register_listener indicating the type of address to match on: either source or sink. Together with some definitions of supported wild card values within the UUri, this might already do the trick.

We could also think about adding a dedicated function for registering a listener for all messages being received on the transport. This would allow the streamer to implement routing at a very generic level.

A third option brought up about @tamarafischer in one of our community calls is to support passing in custom message matching logic when registering a listener, thus allowing client code to implement whatever message filtering they need.

PLeVasseur commented 9 months ago

We could also think about adding a dedicated function for registering a listener for all messages being received on the transport. This would allow the streamer to implement routing at a very generic level.

Yeah, that's another route, to have a dedicated function. For the moment, I asked @evshary to implement it in the way we've talked about a couple of times now: if given a "special" UUri containing only a UAuthority, then register for all messages received on the transport: code here.

One of the other thoughts you had (1 & 3) could also work to make this more general and we'd use the wildcard logic or message matching logic in a certain way to achieve "give me all message" from a certain UAuthority required for a streamer.

stevenhartley commented 9 months ago

Folks, I want to circle back to the core problem introduced by the ask to only use uTransport to implement a streamer that we need to register a listener for a sink or source UUri (depending on the type of UUri passed). Current implementations infer this information but it should be more explicit in the APIs.

I think the simplest is to merge uTransport and RpcServer so that we do not cause all the implementations to change given RpcServer defined an API to register listeners using sink lookup in lieu of source lookup:

UStatus registerListener(UUri, UListener); // Published/notifications looking at the source
UStatus registerRpcListener(UUri, URpcListener); // Request/response where sink is used for listener lookup
PLeVasseur commented 9 months ago

We could do that, but this would only make sense for transports using a shared messaging infrastructure, not peer-to-peer based ones like Some/IP. So, if we have one Some/IP transport instance which handles messages from multiple MCUs like 192.168.0.1 and 192.168.0.1, then we actually should forward a publish message from 192.168.0.1 to the Some/IP transport so that 192.168.0.1 also has a chance of getting it. In this particular case, the uTransport implementation should, however, know about the specifics of the transport protocol at hand and be able to decide to which peer(s) it needs to send the message.

I had a little trouble parsing this -- is this:

multiple MCUs like 192.168.0.1 and 192.168.0.1

supposed to have different IP addresses?

PLeVasseur commented 9 months ago

Folks, I want to circle back to the core problem introduced by the ask to only use uTransport to implement a streamer that we need to register a listener for a sink or source UUri (depending on the type of UUri passed). Current implementations infer this information but it should be more explicit in the APIs.

I think the simplest is to merge uTransport and RpcServer so that we do not cause all the implementations to change given RpcServer defined an API to register listeners using sink lookup in lieu of source lookup:

UStatus registerListener(UUri, UListener); // Published/notifications looking at the source
UStatus registerRpcListener(UUri, URpcListener); // Request/response where sink is used for listener lookup

Hmmm. Shouldn't we register on the sink for a Notification style message though, not the source? I think that's sorta what @sophokles73 and I were agreeing on above.

evshary commented 9 months ago

In any case, my understanding is that we will only want/need to route messages between different transport instances.

I think uStreamer should be only used between different transports, or there will be a routing problem as @sophokles73 mentioned. Also, it's unnecessary to bridge the same transports with uStreamer while these two transports can connect to another one directly. uStreamer can only forward the messages if these two transports can't talk to each other directly, for example, using different MQTT brokers. We can simplify the implementation logic if we can limit the usage of uStreamer.


An obvious improvement would be to add another parameter to register_listener indicating the type of address to match on: either source or sink.

@sophokles73 Could you elaborate more on why we need source and sink while calling register_listener? Maybe I missed some context, but I don't quite get how adding source and sink makes a difference here. No matter source or sink, user's application can register a corresponding callback based on the UUri.

For example:

register_listener(my_uuri, callback): Bind with the callback how I handle the notification, request, and response messages. register_listener(other_uuri, callback): Bind with the callback how I handle the publish message.

FMPOV, it's good to improve register_listener API, but I would like to add the type of UUri in the arguments (Publish, Notification, Request, Response) Some protocols support different mechanisms for publish, notification, request, and response. (For example, Zenoh supports "query&queryable" for "request&response") It would be good for developers to discriminate the types of UUri easily. (Now I used the way mentioned here)


Yeah, that's another route, to have a dedicated function. For the moment, I asked @evshary to implement it in the way we've talked about a couple of times now: if given a "special" UUri containing only a UAuthority, then register for all messages received on the transport: code here.

As @PLeVasseur mentioned, if the UUri has a non-empty UAuthority but with empty UEntity and UResource, we take it as a special UUri, which wants to listen to all the messages from the UAuthority. It works now, but still open to having a more elegant way to do the same logic.

MelamudMichael commented 9 months ago

Sorry for jumping late into the discussion Is the intention to have different URI`s for the message in the publishing and the receiving side?

for example in the up-cpp implementation the same URI is used both for pub & sub

pub: /demoapp/demomsg sub: /demoapp/demomsg

evshary commented 9 months ago

Is the intention to have different URI`s for the message in the publishing and the receiving side?

No, I don't think so. There are two cases in Publish:

Therefore, both the sender and receiver sides still use the same URI, but the meanings are different. One is sink and one is source.

Maybe I misunderstood your question. Feel free to correct me.

MelamudMichael commented 9 months ago

For the notifications i will still need to use only the SINK uri what is the purpose of the source URI then?

evshary commented 9 months ago

For the notifications i will still need to use only the SINK uri what is the purpose of the source URI then?

Then the receiver can know who sends the notification. Please correct me if I'm wrong @sophokles73 @PLeVasseur

MelamudMichael commented 9 months ago

by notification you mean rpc call right?

sophokles73 commented 9 months ago

@evshary

Also, it's unnecessary to bridge the same transports with uStreamer while these two transports can connect to another one directly.

That would be true for transports that actually do so. For Zenoh it is fair to assume that this is not necessary when both authorities are on the same Zenoh network. However, what about a streamer that connects to two different Zenoh networks?

Standard Some/IP uses peer-to-peer communication and can be extended with Some/IP-SD (Service Discovery) on top to support service discovery and event subscription.

IMHO we need to define whether a transport instance is per-se expected to route message between authorities within the transport instance's scope. Initially, I would say that we cannot expect this kind of behavior. Or, put in another way, the routing of messages seems to be the core responsibility of the uStreamer, thus I do not think that uTransport should be expected to do this as well.

We can simplify the implementation logic if we can limit the usage of uStreamer.

I assume you are talking about the simplification of uStreamer here. On the other hand, this woul complicate the implementation of uTransport ...

sophokles73 commented 9 months ago

Could you elaborate more on why we need source and sink while calling register_listener? Maybe I missed some context, but I don't quite get how adding source and sink makes a difference here. No matter source or sink, user's application can register a corresponding callback based on the UUri.

That depends on what you want to listen to. @PLeVasseur tried to summarize it in a previous comment.

For example if you are interested in publish messages originating from a particular authority, then you need to match the incoming message's source attribute to the topic that the listener has been registered with. On the other hand, if you are interested in request messages targeting a uEntity on a particular authority, then you need to match on the message's sink.

With the current definition of UTransport::register_listener it is unclear what to match on ...

sophokles73 commented 9 months ago

by notification you mean rpc call right?

@MelamudMichael no, a notification is a unicast version of a publish message. A publish message is used to inform interested parties (subscribers) of an event that happened. A notification is used to send some information to one particular receiver only (specified in the sink attribute). The sender of these messages does not expect the receiver to return a response.

PLeVasseur commented 9 months ago

With the current definition of UTransport::register_listener it is unclear what to match on ...

My take has been that you register on what you're expecting. So if you want to get a certain kind of notification intended for you, you should register on that UUri. But maybe that only works because I've been thinking in Zenoh mode. Over in the Zenoh implementation, if it's a Notification-style message they'll use the sink as the Zenoh KeyExpr and if it's a Publish-style message they'll use the source as the Zenoh KeyExpr.

Looking to hear your feedback.

sophokles73 commented 9 months ago

Current implementations infer this information but it should be more explicit in the APIs.

I think the simplest is to merge uTransport and RpcServer so that we do not cause all the implementations to change given RpcServer defined an API to register listeners using sink lookup in lieu of source lookup:

UStatus registerListener(UUri, UListener); // Published/notifications looking at the source
UStatus registerRpcListener(UUri, URpcListener); // Request/response where sink is used for listener lookup

@stevenhartley My understanding is, that up-l1 (and uTransport in particular) currently is not aware of different types of messages. All functionality described/defined by the spec applies to all types of messages equally. We can certainly discuss moving the RPC specifics from up-l2 to up-l1 but I wonder if that is really necessary.

I like the idea of first defining explicitly the semantics of the existing register_listener method (e.g. matching on source only) and then adding additional methods for supporting the other (uStreamer) use cases at hand. Based on that, I could imagine having something like

// Listener for Publish messages, matching on source
UStatus registerListener(UUri, UListener);
// alias for registerListener to remain backwards compatibility
UStatus registerSourceListener(UUri, UListener);

// Listener for Request/Response/Notification messages, matching on sink
UStatus registerSinkListener(UUri, UListener);
stevenhartley commented 9 months ago

Current implementations infer this information but it should be more explicit in the APIs. I think the simplest is to merge uTransport and RpcServer so that we do not cause all the implementations to change given RpcServer defined an API to register listeners using sink lookup in lieu of source lookup:

UStatus registerListener(UUri, UListener); // Published/notifications looking at the source
UStatus registerRpcListener(UUri, URpcListener); // Request/response where sink is used for listener lookup

@stevenhartley My understanding is, that up-l1 (and uTransport in particular) currently is not aware of different types of messages. All functionality described/defined by the spec applies to all types of messages equally. We can certainly discuss moving the RPC specifics from up-l2 to up-l1 but I wonder if that is really necessary.

I like the idea of first defining explicitly the semantics of the existing register_listener method (e.g. matching on source only) and then adding additional methods for supporting the other (uStreamer) use cases at hand. Based on that, I could imagine having something like

// Listener for Publish messages, matching on source
UStatus registerListener(UUri, UListener);
// alias for registerListener to remain backwards compatibility
UStatus registerSourceListener(UUri, UListener);

// Listener for Request/Response/Notification messages, matching on sink
UStatus registerSinkListener(UUri, UListener);

I like this proposal, it means we can build RpcClient/RpcServer cleanly on top of uTransport and we can use only uTransport for streamer.

PLeVasseur commented 9 months ago

I think it's a good proposal. Good way to handle normal uEs.

We'd still need a way to give a "special" UUri to both functions outlined by @sophokles73 that have only UAuthority or have two duplicate methods:

// method which explicitly provides only a UAuthority
UStatus registerSourceUAuthorityListener(UAuthority, UListener);

// method which explicitly provides only a UAuthority
UStatus registerSinkUAuthorityListener(UAuthority, UListener);

IMHO, I kind of like the idea of leaving there be just two methods as outlined by Kai (i.e. don't add the above two methods) and handling with a special UUri that contains only UAuthority for now. Feels like it gives us room to modify / improve the wildcarding over the UUri in the future "under the hood" of the API.

What do y'all think?


Depending on which way we go, we need to keep @evshary in the loop so he can adjust accordingly. Please keep monitoring this thread as you have been :slightly_smiling_face:

stevenhartley commented 9 months ago

@PLeVasseur I think the idea is we just call the registerSourceListener() and registerSinkListener() with UAuthority only for the "special case" for uStreamer but not a different API.

PLeVasseur commented 9 months ago

@PLeVasseur I think the idea is we just call the registerSourceListener() and registerSinkListener() with UAuthority only for the "special case" for uStreamer but not a different API.

Makes sense to me. @evshary -- I think you'll still need to do UUri parsing under the hood I suppose to ensure we have the correct subscriber vs queryable listener, I guess? Because RPC is handled using Queryables. No Zenoh expert, but seems needed.

sophokles73 commented 9 months ago

I think the idea is we just call the registerSourceListener() and registerSinkListener() with UAuthority only for the "special case" for uStreamer but not a different API.

I would be fine with that but we would need to make sure that we define the matching semantics thoroughly and precisely (in the up-spec).

We could also add a method for registering a listener with a custom matcher as proposed by @tamarafischer for extra flexibility ...

evshary commented 9 months ago

With the current definition of UTransport::register_listener it is unclear what to match on ...

Okay, I kind of understood what you mean. UTransport::register_listener works, but it confuses users whether they should register sink or source, so you want to make it clear, right?

Depending on which way we go, we need to keep @evshary in the loop so he can adjust accordingly. Please keep monitoring this thread as you have been

Yes, I keep tracking the thread :-)

I think you'll still need to do UUri parsing under the hood I suppose to ensure we have the correct subscriber vs queryable listener, I guess?

Yes... Since now we want to redesign the register_listener API, I hope it can also have a nicer way to handle it.

Just provide an initial idea. Would it be better to have different APIs based the type?

UStatus registerPublishListener(UUri, UListener);
UStatus registerNotificationListener(UUri, UListener);
UStatus registerRequestListener(UUri, UListener);
UStatus registerResponseListener(UUri, UListener);
// or maybe simpler
UStatus registerListener(UUri, UListener, message_type);

I know it's a little silly, but the up-client can implement different mechanism based on the message type. If we only use sink and source, we still need to discriminate the UUri type under the hood.

However, if this is not the case for other protocol (using different mechanism for different type), I think it's ok to only use sink and source for register_listener.

stevenhartley commented 9 months ago

I think it's a good proposal. Good way to handle normal uEs.

We'd still need a way to give a "special" UUri to both functions outlined by @sophokles73 that have only UAuthority or have two duplicate methods:

// method which explicitly provides only a UAuthority
UStatus registerSourceUAuthorityListener(UAuthority, UListener);

// method which explicitly provides only a UAuthority
UStatus registerSinkUAuthorityListener(UAuthority, UListener);

IMHO, I kind of like the idea of leaving there be just two methods as outlined by Kai (i.e. don't add the above two methods) and handling with a special UUri that contains only UAuthority for now. Feels like it gives us room to modify / improve the wildcarding over the UUri in the future "under the hood" of the API.

What do y'all think?

Depending on which way we go, we need to keep @evshary in the loop so he can adjust accordingly. Please keep monitoring this thread as you have been 🙂

@PLeVasseur I don't want to put streamer specific APIs that are app/service facing in uTransport, they will not know which they need to implement and why. It is better that we document that passing only uAuthority means something different for those implementing the interface (not consuming it).

sophokles73 commented 9 months ago

With the current definition of UTransport::register_listener it is unclear what to match on ...

Okay, I kind of understood what you mean. UTransport::register_listener works, but it confuses users whether they should register sink or source, so you want to make it clear, right?

Users can not register a listener matching on either a sink or a source because the semantics of the topic parameter is (currently) unspecified.

sophokles73 commented 9 months ago

Just provide an initial idea. Would it be better to have different APIs based the type?

UStatus registerPublishListener(UUri, UListener);
UStatus registerNotificationListener(UUri, UListener);
UStatus registerRequestListener(UUri, UListener);
UStatus registerResponseListener(UUri, UListener);
// or maybe simpler
UStatus registerListener(UUri, UListener, message_type);

I know it's a little silly, but the up-client can implement different mechanism based on the message type. If we only use sink and source, we still need to discriminate the UUri type under the hood.

@evshary I guess this would be helpful for up-client-zenoh because you need to do different things for receiving Publish/Notifications than for receiving Request/Responses, right? For the former you register a subscriber, for the latter you need to tap into the Queryable processing, I guess.

In general, I can see the advantage of with having the message type specific registration methods instead of the source/sink specific ones: This would make things more transparent for uEntities that want to implement a service client or provider on top of UTransport. The only drawback I see would be, that uStreamers (potentially) need to invoke more methods to register a listener that simply forwards all types of messages for an authority ...

@PLeVasseur @stevenhartley WDYT?

tamarafischer commented 9 months ago

Hi All, I carefully read all the proposals and here are my thoughts: The problem statement: how to match an incoming message on uTransport to a handler function, aka UListener

The main job of uTransport is to take a message, and quickly find a UListener (maybe even more than one) that handle this message by calling their onReceive(source, payload, attributes) method, and quickly move on to the next message.

The discussion is on the functionality of find UListener.

Solution one Add explicit methods to uTransport, to register listeners by source, register listeners by sink, register listeners by source UAuthority. Example:

// method which explicitly provides only a UAuthority
UStatus registerSourceUAuthorityListener(UAuthority, UListener);
// method which explicitly provides only a UAuthority
UStatus registerSinkUAuthorityListener(UAuthority, UListener);

This solution does not change the current implementation, only adds to it. That said, it is not very flexible and every time we need some other matching logic, another method needs to be added and everyone needs to bump versions. Personally, I am not fond of this solution as it is less flexible and does not leave wiggle room to change.

Solution two Change the register event listener API, an example proposed was to add a type. Example:

UStatus registerListener(UUri, UListener, message_type);

This solution changes existing implementations and is a breaking change. I am less fond of this solution because it breaks existing implementation and is not flexible. It does not leave wiggle room to change.

Solution Three Following Uncle Bob: Do one thing and do it right. Provide wiggle room for changes and enhancements, while not breaking current implementations. I would like to propose a dedicated interface whose job it is to find a UListener, given a source and UAttributes.

/**
 * RegisteredListeners is a container for event listeners.
 * It holds events listeners that an implementation of uTransport can use to find the event listener to apply to an incoming UMessage.
 * This interface is used by uTransport to hold all the registered UListeners, so when an event comes in,
 * an UListener can be selected to execute by finding a match by using the source and the message attributes.
 */
public interface RegisteredListeners {

    /**
     * Given a UMessage that comes in from a source {@link UUri}, with {@link org.eclipse.uprotocol.v1.UAttributes} find the UListener to apply.
     * @param source The {@link UUri} of the UMessage source.
     * @param messageAttributes The {@link UAttributes} of the UMessage.
     * @return Returns the UListener registered, using the source and the message attributes to find a match.
     */
    Optional<UListener> findListener(UUri source, UAttributes messageAttributes);

}

uTransport delegates to this dedicated object to find a UListener. UTransport implementation can get it in the constructor or create it in the constructor or do whatever they want. The main idea is that the job of handling messages on the transport layer is not mixed with the logic of finding a correct UListener to handle a message. UTransport reason to change is changes in the underlying transport technology, like a Zenoh enhancements. Program to the RegisteredListeners interface to find the UListener makes it easier to test the UTransport and super easy to test the matching functionality.

The next part of the proposal is an interface that extends from UListener called UListenerWithMatcher.

/**
 * An interface for implementation of a {@link UListener} that hold specific data. Using the data that they hold,
 * an isMatch method needs to be implemented, so that given a UMessage source {@link org.eclipse.uprotocol.v1.UUri} and {@link org.eclipse.uprotocol.v1.UAttributes}
 * the listener can decide if it is a match and should be executed by the transport or not.
 */
public interface UListenerWithMatcher extends UListener {

    /**
     * Implementing isMatch enables the UListener to use its internal data to indicate to the transport layer if this
     * listener should be executed when a message is received.
     * @param messageSource The {@link UUri} of the message source.
     * @param messageAttributes The {@link UAttributes} of the message.
     * @return returns true if the {@link UListener} should be applied by the transport layer.
     */
    boolean isMatch(UUri messageSource, UAttributes messageAttributes);

}

The UListenerWithMatcher is an interface, implementations can provide all the wiggle room we need for matching by source, by partial source, by sink, by sink and type. The following use cases that I implemented: I am an application in the cloud processing messages from cars.

  1. Published Message - matched by type pub.v1 and does not have a sink, !UAttributes.hasSink()
  2. Notification Message - matched by type pub.v1 and has a sink and the sink is my application uE (meaning this notification is meant for me)
  3. RPC response Message - matched by type response.v1 and has a requestId
  4. Subscription Messages - matched by source core.usubscription with cloud authority - here I handle all subscription lifecycle of remote subscriptions

These use cases are use cases for application developers. I believe that streamers can use this to implement their own matchers and handlers. In addition, each utransport can have different implementations.

My personal implementation of RegisteredListeners is like this:

/**
 * An implementation of {@link RegisteredListeners} container that internally uses a flux stream to search for UListeners.
 */
public class FluxRegisteredListeners implements RegisteredListeners {

    private final Flux<UListenerWithMatcher> registeredListeners;

    /**
     * Create the FluxRegisteredListeners.
     * @param listeners A List of {@link UListenerWithMatcher} that this container will manage.
     */
    public FluxRegisteredListeners(List<UListenerWithMatcher> listeners) {
        this.registeredListeners = Flux.fromIterable(listeners);
    }

    /**
     * Given a UMessage that comes in from a source {@link UUri}, with {@link UAttributes} find the UListener to apply.
     * @param source            The {@link UUri} of the UMessage source.
     * @param messageAttributes The {@link UAttributes} of the UMessage.
     * @return Returns the UListener registered using the source and message attributes to find a match.
     */
    @Override
    public Optional<UListener> findListener(UUri source, UAttributes messageAttributes) {
        // in this specific case we can use block since we are not making any network calls or something that
        // blocks the working thread.
        return Optional.ofNullable(this.registeredListeners
                .filter(listener -> listener.isMatch(source, messageAttributes))
                .next()
                .block());
    }

}

I know it is a list and that find can be O(N) but in my case it is a small fixed list making it O(1). This small list is because I can easily group my listeners and not have one for every UURI. This also enables me to register them upfront.

I personally like the idea of the business logic of the matching and the business logic of the messages handling - onReceive - is in the same class. The onReceive is clear as to what messages he is handling.

I am not sure if the use cases for uStreamer are different but by programming to interfaces we have a ton of wiggle room and each part is doing one thing thus making the API easy to define.

I think a lot of these interfaces can go in the core next to UListener. In addition, we can add pure functions on UListenerWithMatcher for marching by UURI, by partial UURI, by type and more. I have a bunch of these implementations in the code I am experimenting with.

@stevenhartley said to create a PR for review - will figure it out if it makes sense to people.

stevenhartley commented 9 months ago

Thanks for sharing @tamarafischer. The concern I have with your proposal is the matching rules you mentioned are fixed by the protocol meaning there is no value to expose this to applications and ask them to do the listener filtering, it should be inside of the core language libraries (not even in the transport implementations). We know that a publish, notification, request, response requires different filters and the rules are fixed, perhaps application might want additional filters but the base filters should be coded in the library.

sophokles73 commented 9 months ago

Thanks for all the input, @tamarafischer :+1: I can see the value of providing this kind of flexibility and IMHO this can be a valuable addition to a UTransport implementation.

Am I right in assuming that a transport implementation would need to pass every message being transferred on the underlying messaging infrastructure to the findListener method in order to determine, if somebody is interested in processing the message?

An advantage of using one of the methods for registering a listener based on a type and URI would be, that the transport implementation could take advantage of message filtering already provided by the underlying messaging infrastructure, e.g. using wildcards on MQTT topics and/or Zenoh keys. This should allow the transport to inspect/process only those messages for which there actually is an interest.

If the above is correct, then wouldn't it be sufficient to provide an additional method for registering an all messages listener, e.g.

UStatus registerAllMessagesListener(UListener listener);`

The provided listener implementation would then need to do the matching and (if a match is made) processing of the passed in message. I guess the same logic would need to be implemented by the client code as with having separate interfaces for matching and processing, or am I mistaken?

tamarafischer commented 9 months ago

Thanks for sharing @tamarafischer. The concern I have with your proposal is the matching rules you mentioned are fixed by the protocol meaning there is no value to expose this to applications and ask them to do the listener filtering, it should be inside of the core language libraries (not even in the transport implementations). We know that a publish, notification, request, response requires different filters and the rules are fixed, perhaps application might want additional filters but the base filters should be coded in the library.

UTranslport delegates to the external RegisteredListener container - it does not have to be static as in my implementation - it can be dynamic and you can add UListenerWithMatch or remove them. An implementation of the container can be with a List (like mine) or a Map or 2 Maps It is an interface and has one method - it is a functional interface (an interface with one method) and you can implement it any way you like. The most important point here is that the functionality of finding the correct UListener is not done bu uTransport making this finding functionality super easy to test. In my experience, testing uTransport is an integration test, while testing matching business logic is an easy unit test.

MelamudMichael commented 9 months ago

My personal opinion - What i like about uTransport that it has only few APIs , so personally i am not in favor of adding many additional APIs

tamarafischer commented 9 months ago

Thanks for all the input, @tamarafischer 👍 I can see the value of providing this kind of flexibility and IMHO this can be a valuable addition to a UTransport implementation.

Am I right in assuming that a transport implementation would need to pass every message being transferred on the underlying messaging infrastructure to the findListener method in order to determine, if somebody is interested in processing the message?

An advantage of using one of the methods for registering a listener based on a type and URI would be, that the transport implementation could take advantage of message filtering already provided by the underlying messaging infrastructure, e.g. using wildcards on MQTT topics and/or Zenoh keys. This should allow the transport to inspect/process only those messages for which there actually is an interest.

If the above is correct, then wouldn't it be sufficient to provide an additional method for registering an all messages listener, e.g.

UStatus registerAllMessagesListener(UListener listener);`

The provided listener implementation would then need to do the matching and (if a match is made) processing of the passed in message. I guess the same logic would need to be implemented by the client code as with having separate interfaces for matching and processing, or am I mistaken?

@sophokles73 If I understand what you are asking for correctly, then you want a container that can be bootstrapped with a method that the underlying transport layer can handle and another data structure holding developer specific handlers, or we could have a container that only uses the underlying transport method way of matching things.

The proposal has two parts. One is the container that takes the matching logic out of the transport - so register listener now has the matching logic in a function that can be called. Second is an additional interface that lets a developers specify a UListener that matches anything. The container can support custom matching, or not support custom matching.

The part I like is that whatever is implemented, the transport does not have all those if statements like in the case @PLeVasseur shared and this business logic is easy to test.

What bothered me the most is that it was so difficult to test all the functionality in uTransport since it had so many dependencies and all this logic.

I hope this makes sense.

tamarafischer commented 9 months ago

Another thing is that we could easily add a findAll method to the container to find all matching listeners and then call the onReceive.

While we are at it, can we make onReceive return a value, making it so much easier to test. Otherwise I have to pass in a class to the listener that is called in the onReceive. In addition it would make it easier to compose.

I am kind of a fan of composing functions, and functions that are easy to test.

sophokles73 commented 9 months ago

I have created two sequence diagrams below which are supposed to illustrate differences in how UTransport might be used in the cloud vs in-vehicle.

Cloud

The diagram below illustrates how a streamer in the cloud could interact with a UTransport in order to receive all types of messages from all vehicles managed by the cloud instance.

sequenceDiagram
  autonumber
  actor S as UStreamer

  create participant L as UListener
  S ->> L: create
  create participant RL as RegisteredListenersImpl
  S ->> RL: create(listener)
  create participant UT as MqttUTransport
  S ->> UT: create(listeners)
  create participant B as MQTT Broker
  UT ->> B: connect
  UT ->> B: SUB("#35;")
  note over UT,B: subscribe to ALL messages
  B ->> UT: PUBLISH(message)
  note over B,UT: ANY kind of message
  activate UT
  UT ->> RL: findListeners(message.attributes)
  activate RL
  alt found listener matching attributes
  RL -->> UT: [listener]
  UT ->>+ L: onReceive(message)
  else no listener matching attributes
  RL -->> UT: []
  end
  deactivate RL
  deactivate UT

Note how the streamer does not register a streamer matching a URI but instead configures the UTransport with a RegisteredListener instance for looking up listeners. This allows the streamer to implement any kind of matching logic to filter the messages it is interested in across the whole vehicle fleet.

The UTransport implementation during start-up subscribes to all messages on the MQTT broker. Whenever a message is published, the broker forwards the message to the transport which tries to find a listener using the RegisteredListeners. If a listener is found, the transport passes the message into the listener for processing. This approach can easily be scaled out horizontally in a cloud environment and provides lots of flexibility regarding the way messages are matched and processed.

In-Vehicle

The diagram below illustrates how a streamer in a vehicle could interact with an MQTT based UTransport in order to receive RPC request messages destined to uEntities on host 192.168.0.1, and which have been published to the MQTT broker that the UTransport implementation is connected to.

sequenceDiagram
  autonumber
  actor S as UStreamer

  create participant L as UListener
  S ->> L: create
  create participant UT as MqttUTransport
  S ->> UT: create
  create participant B as MQTT Broker
  UT ->> B: connect
  S ->> UT: registerRequestListener(//192.168.0.1///, listener)
  UT ->> B: SUB("192.168.0.1///")
  note over UT,B: subscribe to invocations of methods of uEntities on 192.168.0.1
  B ->> UT: PUBLISH(message{sink:192.168.0.1})
  activate UT
  UT ->>+ L: onReceive(message)
  deactivate UT

Note how the streamer registers a specific listener for request messages that have a sink address matching the given IP address. Without this explicit registration, the MQTT transport would not receive any messages at all from the broker. Based on the URI pattern provided by the streamer, the transport implementation can take advantage of the inherent message filtering capabilities of the MQTT broker, i.e. the broker does not forward every message to the transport but only those matching the topic filter used by the transport when it created its subscription with the broker. This way, the (very limited) CPU resources required for filtering and processing the messages can be reduced significantly.

Given that there is only a limited number of devices within the vehicle, it seems reasonable to assume that the streamer will only need to register a few listeners to fulfill its purpose of routing messages between the uEntities in the vehicle.

FMPOV UTransport should support both use-cases. This could be done by providing the dedicated registerRequest|Response|Publish|NotificationListener methods for allowing in-vehicle streamers to reduce CPU load required for message processing, as well as supporting the configuration of custom message matching logic for finding listeners.

evshary commented 9 months ago

@sophokles73 I love your diagrams, which make things clearer.

To summarize, there will be two modes in the implementation (Correct me if I'm mistaken):

  1. Filtering messages in UTransport implementation
    • Initialize UTransport with a listeners container
    • UTransport receives all the messages and do the filtering
// Create listener with callback
let listener1: UListener;
// Create listeners container
let listeners = RegisteredListener::new();
// Add listener to the container for matching
listeners.add(uattributes, listener1);
// Initialize UTrasnport with listener
let upclient = MqttUTransport::new(listeners);
  1. Filtering the message in protocol
    • Initialize UTransport without a listeners container
    • UTrasnport only receives the message filtered by protocol with the given UUri
// Create listener with callback
let listener1: UListener;
// Initialize UTrasnport
let upclient = MqttUTransport::new();
// Register the listener based on UUri type
upclient.register_request_listener(UUri, listener1);

Here are my questions:

  1. In mode 1, I need to call registerRequest|Response|Publish|NotificationListener for all messages under the hood, right? The message type matching algorithm is in UTransport, so UTransport needs the protocol to receive all kinds of messages.
  2. I'm not sure what @PLeVasseur thinks. Do you think you can implement ustreamer with mode 1? Or maybe we still need to define special UUri in mode 2 to listen to the specific UAuthority.
sophokles73 commented 9 months ago

In mode 1, I need to call registerRequest|Response|Publish|NotificationListener for all messages under the hood, right? The message type matching algorithm is in UTransport, so UTransport needs the protocol to receive all kinds of messages.

I assume that by I need to call you mean the transport implementation needs to call :-)

But no, the transport implementation would not need to invoke any of the registerRequest|Response|Publish|NotificationListener methods. As the first sequence diagrams tries to illustrate in step 5, the transport implementation will subscribe to the wildcard topic # in order to receive all messages from the broker. For each message, the transport implementation then invokes RegisteredListeners.getListeners(message) to determine the set of listeners that are interested in processing the message. The matching logic for this has been passed into the transport during setup (step 3) by the streamer. All that the transport implementation then needs to do, is invoking every listener with the message.

sophokles73 commented 9 months ago

Do you think you can implement ustreamer with mode 1? Or maybe we still need to define special UUri in mode 2 to listen to the specific UAuthority.

I am pretty sure that it can be implemented using mode 1 as well. The question is: do we want to? I tried to point out some advantages of mode 2 over mode 1 when running in-vehicle. Maybe @PLeVasseur, @stevenhartley and/or @Mallets have some opinion on whether this is actually relevant in the Zenoh context?

evshary commented 9 months ago

I assume that by I need to call you mean the transport implementation needs to call :-)

Yes, and it makes sense to me now.

The question is: do we want to? I tried to point out some advantages of mode 2 over mode 1 when running in-vehicle.

If the ustreamer is mainly for in-vehicle scenarios, I agree with you. Zenoh can help filter other uninterested topics to improve performance. Then we should still keep the special UUri for this case.