Open bgroenks96 opened 9 years ago
I want this too!
:+1: Can we get some input from collaborators....?
The interface looks good. But I am quite unsure about the implications for the existing implementation. I have the feeling that this feature would require a lot of changes in the core. Currently there is no awareness of running iterators. Can you outline the necessary changes that you see fit to support that feature?
It's been a while since I've looked at this... but I just pushed the changes I had made to my remote fork. So you can take a look here:
https://github.com/bgroenks96/mbassador/tree/add-event-bus-pause-resume
Hi, I just took a quick look at the code you used to support the pause/resume feature. I don't have the time to make more specific comments but wouldn't it be easier to just swap in a queue that has no workers attached (pause()) and then switch back the old queue on resume(). You could drain the intermediate queue into the working queue while checking for the isPaused
flag. With your current implementation you will have problems if one thread resumes the processing while another pauses it. I am also seeing other race conditions as you don't use compareAndSwap
, I am not sure though if those represent synchronization bugs.
Are you referring to the 'pauseAsync' stuff or the code in AbstractPubSubPauseSupport?
I haven't fully implemented and/or reviewed how this will work with async. I actually was hoping for your input on that xD
I would suggest you have a look at the mbassador-spring repository. The entire extension is based on a decorator like pattern. Instead of extending the core, the transactional bus wraps the sync-async bus, adds additional methods and delegates existing ones (adding extra code where necessary). I would advice you to take the same approach. In your method delegates to publish / publishAsync you just check for the AtomicBoolean and either route messages to your intermediate queue or to the underlying bus' method. Whenever resume is triggered you drain the queue (until paused again or queue is empty). Depending on your usecase you might need separate queues for sync and async publication. But maybe you can go with one and drain its content just into the async queue of Mbassador.
I hope I could make myself clear.
How does this look?
At first glance it looks fine. Good job! But I would change/add the constructor to consume a bus instance as parameter. This way it would be possible to write adapters that mix in various flavors. E.g. you could have a ResumableAndTransactionalBus(-Adapter) -- welcome to Spring Framework naming patterns :)
I am also not sure about draining the queue using synchronous publish. This would make resume() a blocking call. Maybe it could be extended to have a boolean param to decide which publication method to use.
Regarding unit tests: They should cover concurrent scenarios (use the helpers to spin up concurrent workloads) and then check invariants. One approach would be to have handlers that synchronize on the same monitor (another atomic boolean) that is set atomically together with the pause()/resume() of the bus. The handlers throw an exception in case they receive messages while being in paused state.
Ok, I added another constructor and a new 'resumeAsync' method. How does that look?
Now you just need unit tests?
The implementation of publish() and publishAsync() violates the DRY principle and needs to be refactored. Have a go with unit tests and we will see how the code works, then.
The implementation of publish() and publishAsync() violates the DRY principle and needs to be refactored.
So would you prefer to just have something like resume(boolean async)
?
I think I actually find having a separate resumeAsync
method slightly more clear, but I won't argue the case. I'll change it to whatever you want.
@bennidi I think I have discovered several issues with taking the spring-like wrapper pattern you suggested:
1) It makes it impossible to extend MBassador
and treat the new pause/resume bus polymorphically as such. The reason being that the constructor of AbstractSyncAsyncMessageBus
makes a polymorphic call to the method getRuntime()
which has to be overloaded by the wrapper subclass in order to provide the internal bus runtime to callers (see code here). This causes a NullPointerException
to be thrown since the superclass constructor has not yet finished executing and thus the internal bus reference has not yet been set by the child constructor.
2) This leaves us with the option of creating a standalone class that extends nothing but implements the appropriate interfaces and has an MBassador
instance injected, much like those in your mbassador-spring repository. While I think this is a good idea in theory, it has some pitfalls in practice:
a. It's utterly impractical for present users of the MBassador
library to go through and change every MBassador
reference to SuspendableMBassador
just to use this feature, which they would have to do. Now you might ask, why not wrap their current MBassador
instances in SuspendableMBassador
s whenever they need it? That brings us to the next issues with this approach:
b. The injection creates a deceptive relationship between SuspendableMBassador
and its injected internal message bus. When the user of SuspendableMBassador
calls pause()
or resume()
, they will expect all message publication to halt for Handlers subscribed to that message bus. However, this can only happen for messages published through the SuspendableMBassador
wrapper. i.e. Every other event being published via references to the injected MBassador
(so all of their existing code) will still be delivered, and there is no way for SuspendableMBassador
to do anything about that. This pretty much renders the feature useless.
c. The rest of the API is not designed with this spring-like pattern in mind. It is much more of a top-down, inheritance based pattern heavily based around the MBassador
concrete implementation, and I don't see any good way to mix the two; especially as far as practical API use goes.
In conclusion, I just don't think the wrapper/injection approach is going to work. Feel free to correct me if I have missed something, though.
In the meantime, I am going to return to my original approach of injecting the PubSubPauseSupport
functionality into the existing MBassador
type via an AbstractPubSubPauseSupport
implementation between AbstractPubSubSupport
and AbstractSyncAsyncMessageBus
. This seems to fall the most in line with the existing API design and will allow users to take advantage of the new pause/resume features without making any changes to their code. It also shouldn't incur any performance costs on people not using pause/resume since publish
and publishAsync
will more or less behave the same minus the addition of making a very cheap conditional check for the current pause state.
Let me know what you think @bennidi. I'll keep working on unit tests in the meantime.
@bennidi It's been a while, but I am still interested to know what you think about this and about #131
@bgroenks96 Sorry for this long pause in communication. I had my priorities shifted to other projects. Looking at your commit I believe that you did a good job. Can you rebase that to the current master branch and make a new PR (I made small API changes that will also affect your code). Would be great if you could add a test that uses the different types of flush modes.
I will profile the performance impact of your changes and have a look at the complete code base in my IDE. I won't promise that it makes it to the next release because I am very careful with additions to core but it looks promising and would be a great addition.
Hi, I've been working on a pull request to add a 'pause' and 'resume' feature to the message bus. Here are the methods from the new interface 'PubSubPauseSupport' with Javadoc comments to explain the concept:
This is especially useful in event-based applications that go through "transitions" i.e. a short period of time in which no Handler is able to receive events, yet it is problematic for inbound events to be completely lost. Being able to 'pause' the event publishing, queueing published events until it is resumed, solves this problem. There shouldn't be any performance cost on normal operation either.
The only thing that's tripping me up a bit is how to support
publishAsync
calls? ShouldAbstractSyncAsyncMessageBus
keep another queue separate from the normalBlockingQueue
or should that be the implementation's job?I would appreciate any feedback, and let me know if you want me to go ahead an push the pull request so you can see the rest of the changes.