Open pnoltes opened 1 year ago
@pnoltes Do you have a development plan for EventAdmin? If no, I'd like do it.
And I have a question about netstring serialization. If we use netstring to serialize celix_properties, it may lose type information about the property value. I think TLV(type-length-value) serialization may be better.
@pnoltes Do you have a development plan for EventAdmin? If no, I'd like do it.
I had no concrete plans for this yet, so please feel free to pick this up :smile:.
And I have a question about netstring serialization. If we use netstring to serialize celix_properties, it may lose type information about the property value. I think TLV(type-length-value) serialization may be better.
Yes you are correct, if #470 is merged, properties will support property value type. So I agree an other option would be better.
Note that there is a old and experimental implementation of the event admin at: https://github.com/apache/celix/tree/rel/celix-2.3.0/misc/experimental/bundles/event_admin/event_admin But I think this can be ignored and maybe only used for inspiration.
What I remember from the experimental event admin impl is that we struggled with when a Event object could be destroyed and who should destroy it.
An option to solve this (IMO) more elegant is to drop the Event type and just reuse a string and celix properties. So a event admin interface like:
typedef struct celix_event_admin {
void* handle;
/**
* @brief async post event call. Event admin will take ownership of the eventProperties and destroy it when no longer needed (events are handled)
*/
celix_status_t postEvent(void* handle, const char* eventTopic, celix_properties_t* eventProperties);
/**
* @brief sync send event call. Caller keeps ownership of eventProperties and eventProperties can safely be destroyed after the call returns
*/
celix_status_t sendEvent(void* handle, const char* eventTopic, const celix_properties_t* eventProperties);
} celix_event_admin_t;
What I remember from the experimental event admin impl is that we struggled with when a Event object could be destroyed and who should destroy it.
How about define a event object like :
typedef struct celix_event_private {
const char *topic;
celix_properties_t *properties;
struct celix_ref ref;
}celix_event_private_t;
typedef struct celix_event {
/**
* @brief event private data
*/
celix_event_private_t privateData;
celix_status_t (*getTopic)(celix_event_t *event, const char **topic);
celix_status_t (*matches)( celix_event_t *event);
celix_status_t (*getProperty)(celix_event_t *event, char *propertyKey, celix_properties_entry_t **propertyEntry);
...
}celix_event_t;
/**
*@brief Create event
*/
celix_event_t *celix_event_create(const char *topic, celix_properties_t* eventProperties);
/**
*@brief Retains(increases the ref count) the event
*/
void celix_event_retain(celix_event_t *event);
/**
*@brief Release(decreases the ref count) the event
*/
void celix_event_release(celix_event_t *event);
What I remember from the experimental event admin impl is that we struggled with when a Event object could be destroyed and who should destroy it.
How about define a event object like :
typedef struct celix_event_private { const char *topic; celix_properties_t *properties; struct celix_ref ref; }celix_event_private_t; typedef struct celix_event { /** * @brief event private data */ celix_event_private_t privateData; celix_status_t (*getTopic)(celix_event_t *event, const char **topic); celix_status_t (*matches)( celix_event_t *event); celix_status_t (*getProperty)(celix_event_t *event, char *propertyKey, celix_properties_entry_t **propertyEntry); ... }celix_event_t; /** *@brief Create event */ celix_event_t *celix_event_create(const char *topic, celix_properties_t* eventProperties); /** *@brief Retains(increases the ref count) the event */ void celix_event_retain(celix_event_t *event); /** *@brief Release(decreases the ref count) the event */ void celix_event_release(celix_event_t *event);
Yes, that will also work. I think the function pointers are not needed (unless we would like a polymorphism event object) and maybe a the event object should own the topic string.
How about define a event object like :
I agree with @pnoltes that function pointers are not needed unless polymorphism is involved.
Even if it's absolutely necessary, please note that any object passing through a dynamic interface (like Celix service) must be managed some way by the interface provider so that when the interface goes away, the object is not available to the outsider any more.
Of course, the above consideration does not apply if event is used only within EventAdmin.
@pnoltes Do you have any plans to merge type_support_for_properties? The event properties might need to support 'byte' type arrays property.
Additionally, according to the OSGI specification 133.9,event properties should support arrays of primitive data types. Should we necessarily support arrays of all primitive types? In my opinion, 'byte' type arrays can handle most scenarios in our daily work, we can first support only byte type arrays.
There is a back-pressure issue inherent in any asynchronous messaging system: what if we have slow message consumers and fast consumers at the same time? Even if an event is relatively small, permitting messages accumulate indefinitely in a slow consumer's queue is never a good idea. That means we may need some QoS mechanism.
@pnoltes Do you have any plans to merge type_support_for_properties? The event properties might need to support 'byte' type arrays property.
Additionally, according to the OSGI specification 133.9,event properties should support arrays of primitive data types. Should we necessarily support arrays of all primitive types? In my opinion, 'byte' type arrays can handle most scenarios in our daily work, we can first support only byte type arrays.
Because the changes "type support for propeties" PR is breaking, I want to merge this after Apache Celix 2.4.0 is released. So when the master branch is working towards a Apache Celix 3.0.0.
But IMO this should hold back the event admin development. In the spec a Event structure primary contains a topic and properties and this can already be done with the current properties. When the properties are updated for type support, I expect this will have minimal or no impact on the Event impl.
Concerning remote event admin. I think it is wise to start with a local event admin implementation, mostly following the spec (if possible) and only after that start on the remote event admin.
I also expect that we still need to have a few discussion about the event admin. For example:
event.remote=true
property) or maybe with a special interface (RemoveEventAdmin
). And do event handlers always receive local and remote events?sendEvent
block until all event local and remote are processed? Will it have a timeout?postEvent
be configured / plugable?Will the technology used for remote be plug-able and how? Maybe something like a distribution provider (as is used in Apache Aries, see https://aries.apache.org/documentation/modules/rsa.html) to make adding a new transport and serialization more easy.
The technology used to publish remote events should be plug-able. IMO, it might be an event handler proxy and register an event handler service that subscribes to all local events and then pushes events subscribed by remote subscribers to the remote distributor. When the remote distributor receives a remote event, it sends the event to the event admin(use sendEvent/postEvent
), and event admin publish remote event to subscriber.
Will discovery of remote event admins be supported? Or do we only support broker-like solution (MQTT, NATS, etc) (which does not need a discovery only a broker address).
We could consider using a broker-like solution and then reuse the discovery of remote service(discovery_zeroconf,etc.) to announce the broker's information. If we use the solution of discovering remote endpoints, we don't know which publisher service endpoints will be created until the sendEvent/postEvent
interface is called, unless we can get the publisher information in advance by some other mechanism
Do we add a wire protocol (as is currently done in pubsub)? This can be used to identify a protocol version (and thus detect if we are remote compatible), add a sync word (and maybe even detect byte order) and optionally provide metadata and/or message fragmentation.
Could it be based on the remote technology used? If the remote technology is TCP/UDP, we should add a reusable wire protocol. If the remote technology is websocket, since websocket already support message fragmentation, is it possible for the distributor to define its own wire protocol (like pubsub_websocket
)?
Are all events remote? Or is this a opt-in per event (e.g. with something like a event.remote=true property) or maybe with a special interface (RemoteEventAdmin). And do event handlers always receive local and remote events?
I think event handlers should receive all events that they subscribe to, whether remote or local. If there are no remote subscribers, the event admin should not push the event to the remote, otherwise, it should push the event to the remote. Additionally, to distinguish between remote and local events, we can add an event property, such as celix.framework.uuid
. And the property should be set by the event admin.
Will the
sendEvent
block until all event local and remote are processed? Will it have a timeout?
We can start with implementing sendEvent
to wait until all events are processed. At the same time, according to the OSGI specification 113.8.2, we can use a blacklist, blacklisted handlers should not be notified of events.
If a timeout is really necessary, we can add the new interface sendEventWithTimeOut
.
Will the thread or thread used by postEvent be configured / plugable?
we can use threadpool, and the max thread number can be configured by the property of event admin bundle. For event handlers that require a order of events, it should be ensured that its handleEvent
method is executed in a single thread
Will remote events, pushed to event handlers on a single thread or multiple threads and is this configurable/plug-able?
To ensure order of events, a single thread maybe be used for the same remote address, but multiple threads maybe be supported for different remote addresses.
As mentioned by Peng, can event handler provide back pressure? If so is this a separate service interface?
We can use a blacklisting mechanism to limit or disconnect event handlers that take too long in the handleEvent
method. If an event handler has cached events, it should manage the cached events.
Also as Peng mentioned, do we want to support QoS (best effort, reliable, etc)
Considering the different requirements for event reliability in different scenarios, we can first implement two QOS policies: at most once, at least once (like MQTT). And set QOS as a property of event.
To remote event admin, I have the following ideas. First of all, the remote distribution provider should be pluggable, and its interface definition is as follows:
typedef struct celix_event_remote_provider_service {
void *handle;
//It is used to distribute the asynchronous event
celix_status_t (*postEvent)(const char *topic, const celix_properties_t *properties);
//It is used to distribute the synchronous event
celix_status_t (*sendEvent)(const char *topic, const celix_properties_t *properties);
}celix_event_remote_provider_service_t;
The service includes properties: celix.event.subscriber.endpoint.framework.uuids, service.ranking
, and event admin selects the best celix_event_remote_provider_service_t
service to distribute events to remote based on these properties.
The selection strategy is as follows:
celix.event.subscriber.endpoint.framework.uuids
are the same, then select the one with the highest service.ranking
.celix.event.subscriber.endpoint.framework.uuids
and service.ranking
are the same, then select the one that was registered first (its service.id
is smallest).In addition, the event properties retain the property name with the "$" prefix. These properties are used to represent the specific information of the remote distribution provider or control the API behavior. The properties that control the API behavior include:
$celix.event.remote.qos
$celix.event.remote.expiryInterval
: event expiration time, expired events are no longer delivered to subscribers. If this value is not set, it means never expire.$celix.event.remote.enable
: If the event contains the property, the event will be forwarded to the remote distribution provider. If this property is not set, it means that the event is only published locally.Note: The above event properties only take effect for remote events.
In addition, I don't intend to forward framework events to remote currently, because I am not sure about the meaning of forwarding framework events to remote. If it needs to be forwarded to remote, it should add the property "org.osgi.framework.uuid".
I plan to implement the remote distribution provider in two ways, one is based on RSA, and the other is based on MQTT. For the implementation based on MQTT, I plan to use the mosquitto library to implement it.
The implementation based on RSA
For the implementation based on RSA, it will add the remote interface celix_event_handler_remote_listener_service_t
and celix_event_remote_subscriber_service_t
. celix_event_handler_remote_listener_service_t
is used to listen to remote subscription information, and celix_event_remote_subscriber_service_t
is used to forward event to remote subscribers. For example, suppose there are two processes A and B in the system, process B needs to subscribe to event E1, and event E1 is published by process A, then process A needs to provide c
service, and process B needs to provide celix_event_remote_subscriber_service_t
service. Process B forwards its subscription information to A by calling the celix_event_handler_remote_listener_service_t
service of process A, and process A forwards E1 to process B by calling the celix_event_remote_subscriber_service_t
service of process B. Both celix_event_handler_remote_listener_service_t
and celix_event_remote_subscriber_service_t
are called remotely.
Note:I don't consider attaching the subscription information of process B to the celix_event_remote_subscriber_service_t
service properties, because if this is done, when updating the subscription information, process B needs to re-register the celix_event_remote_subscriber_service_t
service, during this period, process B may lose some events published by process A. Therefore, process B forwards its subscription information to A through the celix_event_handler_remote_listener_service_t
of process A.
The definition of celix_event_handler_remote_listener_service_t
and celix_event_remote_subscriber_service_t
is as follows:
typedef struct celix_event_handler_remote_listener_service {
void *handle;
celix_status_t (*handlerAdded)(void* handle, const char* handlerFwUUID, long handlerSvcId, const char* topics, const char* filter, int qos);
celix_status_t (*handlerRemoved)(void* handle, const char* handlerFwUUID, long handlerSvcId, const char* topics, const char* filter);
}celix_event_handler_remote_listener_service_t;
typedef struct celix_event_remote_subscriber_service {
void* handle;
celix_status_t (*receiveEventAsync)(void* handle, const char* topic, const celix_properties_t* properties);
celix_status_t (*receiveEventSync)(void* handle, const char* topic, const celix_properties_t* properties);
}celix_event_remote_subscriber_service_t;
The component relationship diagram is as follows:
Some other key points to consider in the implementation:
celix_properties_t
type will be added to dfi, which is used for the serialization and deserialization of event properties in remote calls.celix_event_remote_subscriber_service_t
offline handling strategy: If the event QOS value is QOS1 or QOS2, it should be delivered to online services, and then wait for offline services to online or events to expire or sessions to expire; QOS0 event are directly discarded.The implementation based on MQTT
For the implementation based on MQTT, I will add the remote interface celix_mqtt_broker_info_service_t
, which is used to obtain the address information of the MQTT broker. The one of remote distribution provider obtains the address information of the MQTT broker by parsing the mosquitto.conf file, and then registers the remote service celix_mqtt_broker_info_service
, other remote distribution provider obtains the address information from celix_mqtt_broker_info_service_t.
The definition of celix_mqtt_broker_info_service
is as follows:
#define CELIX_MQTT_BROKER_INFO_SERVICE_NAME "celix_mqtt_broker_info_service"
//The address of the MQTT broker
#define CELIX_MQTT_BROKER_ADDRESSES "celix.mqtt.broker.addresses"
//The port of the MQTT broker
#define CELIX_MQTT_BROKER_PORT "celix.mqtt.broker.port"
typedef struct celix_mqtt_broker_info_service {
void *handle;
}celix_mqtt_broker_info_service_t;
The component relationship diagram is as follows:
Some other key points to consider in the implementation:
celix/mqtt/subscriptions/<frameworkUUID>
to the broker, and obtains remote subscription information by subscribing to the message celix/mqtt/subscriptions/*
. Then the provider should filter events based on the remote subscription information before publishing the event to the broker.Considering the RSA approach, will event delivery be expensive?
Note that we don't support async remote method call yet.
Extending dfi to support celix_properties_t
will be a very welcome addition. I guess we need #721 and a method to serialize/deserialize celix_properties_t
into JSON, right?
Considering the RSA approach, will event delivery be expensive?
Yes, this is a weakness of RSA, maybe it can be solved by improve RSA, such as supporting long links, supporting asynchronous calls.Its advantage is that the communication method can be expanded with the expansion of RSA. If this approach is controversial, I will consider not implementing it, until the implementation of RSA meets the requirements of event admin.
Nice detailed plan for the Remote Event Admin.
I have several remarks:
1) Does the Event Admin need to be aware of remote events? I was considering the possibility that a Remote Event Admin could register an EventHandler with "event.topics=*". This way, the Remote Event Admin can receive any local events and, perhaps, even filter for events marked for remote communication (something like "event.filter=(celix.event.remote.enable=true)") and send them remotely. The counterpart of the Remote Event Admin can inject messages using the local Event Admin. The downside is that, from a remote perspective, the events are always treated as a postEvent. However, considering that remote calls can lead to long blocks and possible timeouts, this approach might be preferable.
2) I appreciate the addition of qos and expiryInterval. However, I am not sure if I like the $ prefix. As far as I am aware, the OSGi standard does not use a special character prefix to indicate reserved property names but instead relies on the use of more fully qualified names (celix.event.remote.*). Although we do not have to follow the OSGi specification exactly, I prefer the OSGi approach in this case.
3) Originally, the idea with PubSub was to build remote services on top of it (the C++ Remote Service Admin even had an integration test based on this). For me, it also makes more sense to build a Remote Services Admin based on a Remote Event Admin concept (note, this would then be a Remote Service Admin that does not require remote discovery). Starting with a remote events implementation based on a broker backend seems more logical to me. A Remote Events based on RSA, or vice versa, can be addressed in a follow-up.
3) In my opinion, one of the benefits of using a message broker, as opposed to a peer-to-peer concept, is the elimination of the need for remote discovery. Instead, a broker server can be configured. This configuration can be simple, based on config properties, or more elaborate, based on configuration dependencies. Although we do not have a Config Admin, I think the concept of a configuration dependency is already usable. Therefore, perhaps it's an idea to let an MQTT Remote Service Admin depend on something like a "RemoteServiceAdminMqttConfiguration". This could be a marking interface with configuration service properties or a service with some "get" function to retrieve MQTT configuration. An instance of "RemoteServiceAdminMqttConfiguration" could be realized by reading config properties, a system-wide or local (JSON) configuration file (e.g., /etc/mosquitto.conf), or even (in the future) a dynamically discovered remote service. I expect that in most cases, a static configuration will be needed, but this approach keeps the option open for a more dynamic way of providing configuration.
Lastly, please note that these are my thoughts. If there is an immediate need to couple a Remote Event Admin with RSA, we can, of course, more directly pursue that.
The downside is that, from a remote perspective, the events are always treated as a postEvent. However, considering that remote calls can lead to long blocks and possible timeouts, this approach might be preferable.
In some rare cases, where performance is not a concern, we still need sync semantics. For example, a master machine want to make sure all slave machines are ready to handle master shutdown before it actually performs the action.
Originally, the idea with PubSub was to build remote services on top of it (the C++ Remote Service Admin even had an integration test based on this).
Yes, conventionally messaging middle-wire lies below RPC, i.e. RPC can be implemented with messaging middle-wire. Our EventAdmin is still not general enough to be used to implement RPC. For example `celix_properties_t' still lacks serialization mechanism. And our RSA lacks async interface, which makes it impossible to implement EventAdmin efficiently. Event if RSA has such support, whether it is a good idea to go that way is still debatable.
I think the main motivation is to unify the service discovery of RSA/REA(RemoteEventAdmin). Currently an empty marking interface with its meta data can be easily advertised by service discovery of RSA. If the resulting coupling between RSA and REA is not desirable, we could instead try to make service discovery independent of RSA.
Does the Event Admin need to be aware of remote events? I was considering the possibility that a Remote Event Admin could register an EventHandler with "event.topics=*". This way, the Remote Event Admin can receive any local events and, perhaps, even filter for events marked for remote communication (something like "event.filter=(celix.event.remote.enable=true)") and send them remotely. The counterpart of the Remote Event Admin can inject messages using the local Event Admin. The downside is that, from a remote perspective, the events are always treated as a postEvent. However, considering that remote calls can lead to long blocks and possible timeouts, this approach might be preferable.
According to the EventAdmin specification:
The Event Admin service must be registered as a service with the object class org.osgi.service.event.EventAdmin. Multiple Event Admin services can be registered. Publishers should publish their event on the Event Admin service with the highest value for the SERVICE_RANKING service property. This is the service selected by the getServiceReference method.
Maybe we can implement an EventAdminWrapper, which implements the Event Admin Service, and its service ranking is the highest. In this way, EventAdminWrapper can forward events to remote (use EventRemoteProvider) or to local (use EventAdmin). And EventAdmin can be unaware of remote events.
In addition, should we consider multiple EventRemoteProviders existing? If yes, EventAdminWrapper can also be used to implement a strategy to select the perfect EventRemoteProvider to forward remote events.
For the MQTT broker address information, it maybe requires dynamic discovery in my work. Maybe we can implement dynamic discovery in the following ways:
Maybe we can implement an EventAdminWrapper, which implements the Event Admin Service, and its service ranking is the highest. In this way, EventAdminWrapper can forward events to remote (use EventRemoteProvider) or to local (use EventAdmin). And EventAdmin can be unaware of remote events.
In addition, should we consider multiple EventRemoteProviders existing? If yes, EventAdminWrapper can also be used to implement a strategy to select the perfect EventRemoteProvider to forward remote events.
Rethinking this, I think the proposed solution (using a celix_event_remote_provider_service_t
) is a cleaner setup.
For the MQTT broker address information, it maybe requires dynamic discovery in my work.Maybe we can implement dynamic discovery in the following ways
If discovery is needed, RSA indeed could be used for this. Maybe a bit an overkill, but on the other hand the code exists, is tested and can be used for this (marker interface with some configuration properties).
Implement the EventAdmin OSGi specification for Apache Celix.
The initial implementation should be a local useable EventAdmin, but can be prepared to function as a remote EventAdmin.
For a remote EventAdmin additional SPI should be designed to that transport and discovery can be plugged in. Serialization could also be plug-able, but arguable this is not needed and straight forward netstring serialization can be reused.
Additionally if celix_properties is extended to support a "bytes" value type, the event admin could be used to communicate arbitrary message using a "payload" property entry. In this case, serialization for the "bytes" value type should be a low overhead solution.