Open magicprinc opened 5 months ago
I don't really understand what you mean here. All bulkhead implementations are already based on semaphores, and the async bulkheads already support the waiting queue mechanism you describe.
What do you want to achieve by adding a timeout to acquiring the semaphores?
Here is an example,
@Test
void _bulkhead () {
// the other side has set a contractual limit to 2 concurrent connections
var ft = FaultTolerance.<Integer>create().withDescription("_bulkhead")
.withBulkhead().limit(2).done()
.build();
var total = new AtomicInteger();
var concurrent = new AtomicInteger();
// we load tasks (the queue length is controlled) and run them in Virtual Thread
for (int i=0; i < 20; i++){
Runnable task = ()->{
try {
total.incrementAndGet();
ft.call(()->{
concurrent.incrementAndGet();
try { Thread.sleep(20); } catch (Throwable ignore){}
System.out.printf("Data sent: total=%s, concurrent=%s%n", total, concurrent);
concurrent.decrementAndGet();
return null;
});
} catch (Exception e) {
System.out.printf("FT.call failed: total=%s%n", total);
}
};
JThread.VT.execute(task);// run in VT using VT executor
}
}
here is the result
FT.call failed: total=16
FT.call failed: total=18
FT.call failed: total=19
FT.call failed: total=20
FT.call failed: total=20
FT.call failed: total=20
FT.call failed: total=20
FT.call failed: total=20
FT.call failed: total=20
FT.call failed: total=20
FT.call failed: total=20
FT.call failed: total=20
FT.call failed: total=20
FT.call failed: total=20
FT.call failed: total=20
FT.call failed: total=20
FT.call failed: total=20
FT.call failed: total=20
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=1
You want something like
var ft = FaultTolerance.<Integer>createAsync()
.withDescription("_bulkhead")
.withBulkhead().limit(2).queueSize(100).done()
.build();
Then, your task needs to return CompletionStage
, which shouldn't be an issue, you'd just replace return null;
with return CompletableFuture.completedStage(null);
.
If all tasks in the application are submitted to the same executor, you could even add withThreadOffload(true)
to the FaultTolerance
builder and set that executor using StandaloneFaultTolerance.configure()
. That way, you wouldn't even have to create the Runnable
and submit it to the executor manually, SmallRye Fault Tolerance would do that for you. (Actually, this is quite a limitation. At least in the programmatic API, we should be able to specify the executor for each FaultTolerance
object separately. Let me file an issue for that.)
We live in VT world now: you don't need callbacks/async/reactive (including CompletableFuture) anymore 🥳
It is time for an amazing simplification. No more reactive frameworks with their ugliness and complexity. Die WebFlux! RIP Mutiny
~ "Bulkhead with timeout"
@Test @SneakyThrows
void _bulkhead () {
// the other side has set a contractual limit to 2 concurrent connections
var ft = new Semaphore(2);
var total = new AtomicInteger();
var concurrent = new AtomicInteger();
var endTest = new CountDownLatch(20);
// we load tasks (the queue length is controlled) and run them in Virtual Thread
for (int i=0; i < 20; i++){
Runnable task = ()->{
try {
total.incrementAndGet();
ft.tryAcquire(20, TimeUnit.SECONDS);// dirty quick demo: result ignored
try {
concurrent.incrementAndGet();
Thread.sleep(20);
System.out.printf("Data sent: total=%s, concurrent=%s%n", total, concurrent);
concurrent.decrementAndGet();
} finally {
ft.release();
}
} catch (Exception e) {
System.out.printf("FT.call failed: total=%s%n", total);
} finally {
endTest.countDown();
}
};
JThread.VT.execute(task);// run in VT using VT executor
}
endTest.await();
}
result
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=1
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=1
where
JThread.VT.execute(task) ~ Thread.ofVirtual().start(task), but with micrometer metrics
The code of your class could look like this (important snippets)
public class SemaphoreBulkhead<V> extends BulkheadBase<V> {
private final Semaphore semaphore;
private int timeout;// semaphore acquire timeout millis
private boolean acquire () {
if (timeout <= 0) {
return semaphore.tryAcquire();// as before / default behavior
} else {
try {
return semaphore.tryAcquire(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
}
private V doApply(InvocationContext<V> ctx) throws Exception {
if (acquire()) {
LOG.trace("Semaphore acquired, accepting task into bulkhead");
ctx.fireEvent(BulkheadEvents.DecisionMade.ACCEPTED);
ctx.fireEvent(BulkheadEvents.StartedRunning.INSTANCE);
It emulates limit + queue
for plain blocking FT
The queue is outside of FT
Don't let anyone tell you that virtual threads are some form of a panacea; they are still just threads. The only thing that might possibly change in the future is that we will no longer need to deal with non-blocking concurrency (aka event loops or "reactive"); blocking concurrency (aka multi-threading) will be enough. But we're certainly not in that world yet; it will take years for the ecosystem to adapt.
Some people equate the word "asynchronous" with one or the other form of concurrency I mentioned above. In SmallRye Fault Tolerance, I use the word "asynchronous" in a more abstract manner, close to the dictionary definition of "not happening at the same time".
If an operation is asynchronous, it means its result is not available right here right now; the idea of reifying that concept into a future is decades old. As it turns out, if we restrict ourselves to the JDK, the best type for representing futures is currently CompletionStage
/ CompletableFuture
, because it supports both styles of concurrency. So that's what SmallRye Fault Tolerance uses. Maybe at some point, this won't be necessary, but as I mentioned above, we're years away from that.
Now, here's a piece of code that uses bulkhead with a queue (which requires the operation to be asynchronous, otherwise there's no way to implement queueing):
@Test
public void test() throws Exception {
var ft = FaultTolerance.<Integer>createAsync()
.withDescription("_bulkhead")
.withBulkhead().limit(2).queueSize(100).done()
.build();
var total = new AtomicInteger();
var concurrent = new AtomicInteger();
var endTest = new CountDownLatch(20);
for (int i = 0; i < 20; i++) {
Runnable task = () -> {
try {
total.incrementAndGet();
ft.call(() -> {
concurrent.incrementAndGet();
try {
Thread.sleep(20);
} catch (Throwable ignore) {
}
System.out.printf("Data sent: total=%s, concurrent=%s%n", total, concurrent);
concurrent.decrementAndGet();
return null;
});
} catch (Exception e) {
System.out.printf("FT.call failed: total=%s%n", total);
} finally {
endTest.countDown();
}
};
executor.execute(task);
}
endTest.await();
}
Apparently, returning null
works in this particular case, but I wouldn't rely on it for anything real. I really would return CompletableFuture.completedStage(null);
.
If I rely on SmallRye Fault Tolerance to submit the tasks to an executor on its own, instead of doing it manually, that code becomes:
@Test
public void test() throws Exception {
var ft = FaultTolerance.<Integer>createAsync()
.withDescription("_bulkhead")
.withBulkhead().limit(2).queueSize(100).done()
.withThreadOffload(true)
.build();
var total = new AtomicInteger();
var concurrent = new AtomicInteger();
var endTest = new CountDownLatch(20);
for (int i = 0; i < 20; i++) {
ft.call(() -> {
total.incrementAndGet();
concurrent.incrementAndGet();
try {
Thread.sleep(20);
} catch (Throwable ignore) {
}
System.out.printf("Data sent: total=%s, concurrent=%s%n", total, concurrent);
concurrent.decrementAndGet();
endTest.countDown();
return CompletableFuture.completedStage(null);
});
}
endTest.await();
}
This works perfectly well.
Ah OK, so you want the queued tasks to have a timeout? And if the task is in the queue for too long, it should fail immediately? That seems like a reasonable request.
VT are "async". But they are async somewhere in the depth. They are good enough for me. Oracle's Helidon Nima has completely removed Netty and uses plain old sockets + Virtual Threads.
otherwise there's no way to implement queueing
I am sorry 🙏, but there is: limit + queue ≈ tryAcquire with timeout
We started to remove "manual async/reactive" code and usually the new/blocking/VT code is 2-4 times shorter (and much, much simpler).
See my example with Semaphore ("Bulkhead with timeout") I have to use something like this now and I really want to use only one FT for this
Just FYI: https://docs.oracle.com/en/java/javase/21/core/virtual-threads.html
Oracle has a new mantra: Virtual Threads are new BlockingQueue. VT are actually Runnable in hidden ForkJoinPool queue(s)
Ah I see. If you have 20 threads competing for a semaphore that only hands out 2 permits, you indeed have an implicit form of queueing, but at the cost of having 20 threads. That may be fine with virtual threads, but it definitely is not fine with platform threads.
Now, when it comes to
limit + queue ≈ tryAcquire with timeout
that's not true. You don't need the timeout there, that doesn't really change anything when it comes to the queueing semantics. It is an extra cherry on top.
In SmallRye Fault Tolerance, bulkheads implement an explicit form of queueing. Tasks are put into an actual queue and are not submitted to an executor until they obtain a permit. (Aside: in the annotation-based API, this is only true for CompletionStage
-returning tasks and not for Future
-returning tasks, but the programmatic API doesn't support Future
-returning tasks at all.)
You don't need the timeout there, that doesn't really change anything when it comes to the queueing semantics
Without timeout, FT Bulkhead simply fails the task. See results in example 1 (current implementation) and in example 2 ("Bulkhead with timeout")
You can definitely see a "queue" in example 2
In SmallRye Fault Tolerance, bulkheads implement an explicit form of queueing
New times, new rules maybe?
With VT you can delegate queuing to them. It is an official position of JDK architects.
Before: producer → explicit BlockingQueue → Threads Now: producer → VT (queue in inside)
Actually one more thing, I probably didn't express that clearly above. The usage of CompletionStage
in SmallRye Fault Tolerance doesn't mean you're required to use non-blocking concurrency. If you do thread offloads (either manually, or ask SmallRye Fault Tolerance to do it for you), you can use blocking concurrency just fine. In that case, you will only create already complete CompletionStage
s (CompletableFuture.completedStage()
and failedStage()
). If you want to block on the CompletionStage
returned by SmallRye Fault Tolerance, you can call toCompletableFuture()
, and that's guaranteed to work. This indeed entails certain amount of code overhead, but not very big I believe.
Without timeout, FT Bulkhead simply fails the task.
Ah ah ah, you're right, my bad. I didn't look properly at how the Semaphore
methods differ.
New times, new rules maybe? With VT you can delegate queuing to them. It is an official position of JDK architects.
SmallRye Fault Tolerance has to work in the actual real world, which will take years to fully adopt virtual threads. Sorry, but this is just the sad state of reality.
Now I see your point. It should definitely help!
Could you nevertheless add timeout to Bulkhead to 1) limit time in the queue 2) prepare for great code simplification in the future (complete removal of Async part) 3) simplify the code for those, who already uses VT (no need for CF and async part; VT does this) ?
It should NOT break anything. Old code doesn't use new builder.timeout() and works as before (see my proposal of changes of your class SemaphoreBulkhead)
Looking at Semaphore
again, you actually don't need timeout to achieve queueing. You just need Semaphore.acquire()
instead of tryAcquire()
. The timeout is useful, but not strictly necessary when it comes to achieving the queueing semantics.
Now, supporting timeout for the time spent in queue is a good idea, but doesn't feel like a super high priority right now. It would be relatively complex to implement in the bulkheads that have an explicit queue, actually.
Semaphore.acquire()
doesn't have timeout and I am afraid of this time to be too long...
I don't usually like blocking operations without timeout
That is indeed reasonable.
Ok, if we call it time-in-the-queue timeout, it makes things complicated.
So it is not :-) ⇒ It is workSemaphore-acquire-timeout. It has sense even in case of CF version. If you have spikes in the traffic, then it is better to have some timeout to acquire it, don't fail immediately.
It helps to smooth out the peaks, keep order of messages, prevent lost of packets, etc
OK, so in #989, I added withThreadOffloadExecutor()
to specify an executor explicitly, in case the default one (provided by StandaloneFaultTolerance.configure()
) is not the right one.
I also realized that FaultTolerance.call()
is not the only method you can use. It you want to guard a Supplier
, you can call get()
, and if you want to guard a Runnable
like in this example, you can call run()
. With a Runnable
, you don't need to produce a meaningless return value.
Combining both of the above, you can write:
@Test
public void test() throws Exception {
var ft = FaultTolerance.<Integer>createAsync()
.withDescription("_bulkhead")
.withBulkhead().limit(2).queueSize(100).done()
.withThreadOffload(true)
.withThreadOffloadExecutor(executor)
.build();
var total = new AtomicInteger();
var concurrent = new AtomicInteger();
var endTest = new CountDownLatch(20);
for (int i = 0; i < 20; i++) {
ft.run(() -> {
total.incrementAndGet();
concurrent.incrementAndGet();
try {
Thread.sleep(20);
} catch (Throwable ignore) {
}
System.out.printf("Data sent: total=%s, concurrent=%s%n", total, concurrent);
concurrent.decrementAndGet();
endTest.countDown();
});
}
endTest.await();
}
Great job! 🥳
Is it still possible to add acquireTimeout to SemaphoreBulkhead and FutureThreadPoolBulkhead (is it still used?)? 🙏😅
then
@Test
public void test() throws Exception {
var ft = FaultTolerance.<Integer>create()
.withDescription("_bulkhead")
.withBulkhead().limit(2).acquireTimeout(5, MINUTES)/* currently fails in case of CompletionStageThreadPoolBulkhead */.done()
.build();
var total = new AtomicInteger();
var concurrent = new AtomicInteger();
for (int i = 0; i < 20; i++) {
ft.run(() -> {
total.incrementAndGet();
concurrent.incrementAndGet();
try {
Thread.sleep(20);
} catch (Throwable ignore) {
}
System.out.printf("Data sent: total=%s, concurrent=%s%n", total, concurrent);
concurrent.decrementAndGet();
});
}
}
FutureThreadPoolBulkhead
is not used in the programmatic API, only in the annotation-based API.
I'm not going to add a timeout to bulkheads at the moment. It feels like a useful feature, but it also seems there's a big potential for scope creep. I'm going to recommend that if you want concurrency limiter with a queue, you should treat the guarded actions as asynchronous.
Forget about queue for a moment, let's say: I don't want sync Bulkhead to fail immediately: I am ready to wait
I am ready to wait with retry (and I do: there is Thread.sleep there) and am ready to wait with Bulkhead
I don't want using - in sync/blocing code (!) - async infrastructure - to block (!)
That's exactly the scope creep I want to avoid. Bulkhead (aka concurrency limiter) already has a concept of a queue, the concept of a timeout somewhat conflicts with that. It actually feels like allowing queueSize
tasks to sleep and start rejecting only after that is what you need (and that's actually what FutureThreadPoolBulkhead
already implements, using 2 semaphores). There might be a way to allow that.
Idea with 2 semaphores in sync blocking code has sense! I feel it can be implemented and do exactly what VT users need.
It would do for me even more, than I originally asked! 🙏
FutureThreadPoolBulkhead works as expected → desired (except of course the return type :-)
@Test @SneakyThrows
void bulkheadWithQueue () {
var total = new AtomicInteger();
var concurrent = new AtomicInteger();
// the other side has set a contractual limit to 2 concurrent connections
var ft = new FutureThreadPoolBulkhead<Integer>(InvocationContext::call, "bhq", 2, 20){
@Override @SneakyThrows
public Future<Integer> apply (InvocationContext<Future<Integer>> ctx) {
total.incrementAndGet();
return super.apply(ctx);
}
};
var futures = ConcurrentHashMap.<Future<Integer>>newKeySet();
// we load tasks: the queue length and concurrency is controlled by Bulkhead and run them in Virtual Thread
for (int i=0; i < 20; i++){
int index = i;
Callable<Future<Integer>> task = ()->{
try {
concurrent.incrementAndGet();
try { Thread.sleep(20); } catch (Throwable ignore){}
System.out.printf("Data sent: total=%s, concurrent=%s%n", total, concurrent);
concurrent.decrementAndGet();
} catch (Exception e) {
System.out.printf("FT.call failed: total=%s%n", total);
}
return Futures.immediateFuture(index);
};
var ctx = new InvocationContext<Future<Integer>>(task);
JThread.VT.execute(()->{
var f = ft.apply(ctx);
futures.add(f);
});// run in VT using VT executor
}
while (futures.size() < 20) {
Thread.sleep(10);
}
for (var f : futures){
System.out.println(f.get());
}
}
result
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=1
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=2
Data sent: total=20, concurrent=1
2
9
15
13
1
...
Delete "Future", add acquireTimeout and the best Bulkhead implementation is ready 🥳
I have made a PR for active JDK 21+ VT users (PR doesn't require JDK 21, but is very useful for its users).
Sync/Blocking Bulkhead with Queue and Limit.
Could you please review/rewrite and accept this PR.
This is the right Bulkhead behavior and I/We/Everybody really need it 🙏🤲🤝
As you know, Virtual Threads are actually Runnables in a hidden ForkJoinPool Queue. You can submit millions of them. And such Bulkhead could help immensely!
You can control concurrency/parallelism level with limit and the size of such "virtual" Queue with your tasks with queueSize.
Blocking the Virtual Thread is very cheap, so the Semaphore is precise choice
@Ladicek Please! 🙏😥
As you see no timeouts 🤝 (I have managed to emulate them)
But I need two semaphores very badly…
https://github.com/smallrye/smallrye-fault-tolerance/pull/1015 → this is your own io.smallrye.faulttolerance.core.bulkhead.FutureThreadPoolBulkhead
, but without Future and used in smallrye-fault-tolerance-standalone
It is optional, for those who knows, everything works as before by default.
Now, PR has unit tests and example use case: how FT can be used in VT environment to limit virtual queue and concurrency.
🙏
With JDK21 we have Virtual Threads: they are cheap, and you can block them without any thoughts
Could you add an option to Semaphore based Bulkhead strategies (e.g. timeout: long millis), which, when enabled, changes the behavior of Semaphore from
tryAcquire()
totryAcquire(long timeout, TimeUnit unit/*MILLISECONDS*/)
So, if one runs a code with Bulkhead and sets limit to 10, then only 10 threads would run concurrently and other (millions) would be put into "waiting queue"
The application code itself would be much simpler.
Now I have to use plain Semaphore in addition to FT :-(