Open dawnbreaks opened 4 years ago
How about implementing a custom ScheduledExecutorService
implementation that combines ThreadPoolExecutorService
and ScheduledExecutorService
? You could delegate non-scheduled tasks to ThreadPoolExecutorService
while delegating scheduled tasks to ScheduledExecutorService
. If this sounds good, we could add one to com.linecorp.armeria.common.util
.
/cc @renaudb who mentioned this issue when we switch to ScheduledExecutorService
.
We need a better ScheduledExecutorService
implementation that has bounded queue and max thread pool size.
You could delegate non-scheduled tasks to
ThreadPoolExecutorService
while delegating scheduled tasks toScheduledExecutorService
.
Such a custom ScheduledExecutorService
implementation will has two queues, one bounded and the other is unbounded? And it has two seperate thread pools? It works but a little strange.
Yeah, more proper implementation would be desirable. It's interesting that nobody built it yet..
The solution we ended with is a bit of a hack. We don't need to schedule things in the future, so we just create our own implementation.
class MyBlockingTaskExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
public MyBlockingTaskExecutor(...) {
super(..., new SynchronousQueue<>(), ...);
}
...
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
throw new UnsupportedOperationException("Scheduling tasks not supported.");
}
...
}
And we inject it on Server creation.
ServerBuilder builder =
Server.builder()
...
.blockingTaskExecutor(new MyBlockingTaskExecutor(...), false)
...
And we make all the schedule*
methods throw UnsupportedOperationException
. This also allows us to set a rejection handle that increment some counter we can add alerts on to let us know when the queue is full.
A real solution might be to come up with some kind of DelayQueue with a fixed size, but we haven't looked into this, since we haven't had the need for it.
@renaudb Referred to this issue from slack channel discussion, I am interested to know with your hack workaround of using ThreadPoolExecutor with bounded size queue, what will be the default response (say http response) status code if the bounded queue is full? Is there a way to customize the response, e.g. return 429 TOO MANY REQUESTS status code? The only mechanism I know to customize this is through ThrottlingRejectHandler, but it is only invoked when the {@link ThrottlingStrategy} rejects the specified {@link Request} while here I do not know if a ThrottlingStrategy can be properly implemented (if one just uses ThrottlingStrategy.always(), ThrottlingRejectHandler will never be invoked I suppose?)
@chenqi0805 I don't have access to this code anymore, so I'm going by memory. When your work queue is full, by default ThreadPoolExecutor
will throw a RejectedExecutionException
. You can catch those near your server entry point and return 429 TOO MANY REQUEST
or RESOURCE_EXHAUSTED
if using gRPC. You could potentially create a service decorator to encapsulate this logic and simply decorate existing services to return the proper error when catching a RejectedExecutionException
.
You can also pass in your own RejectedExecutionHandler
as an argument to the ThreadPoolExecutor
constructor. One use case for this could be to add some logging or counters that keeps track of rejected requests to add some dashboarding around overwhelmed services or you could throw your own exception to make it easier to catch and return the right HTTP or gRPC code.
You can catch those near your server entry point and return 429 TOO MANY REQUEST or RESOURCE_EXHAUSTED if using gRPC. You could potentially create a service decorator to encapsulate this logic and simply decorate existing services to return the proper error when catching a RejectedExecutionException.
@renaudb I am using HTTP. Capturing RejectedExecutionException at the decorating service level is not obvious from the documentation. I might need to do more deep diving.
I haven't tried running this code, but it shouldn't be much more complex than this.
public class BoundedService extends SimpleDecoratingHttpService {
public BoundedService(HttpService delegate) {
super(delegate);
}
@Override
public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) throws Exception {
HttpService delegate = unwrap();
return delegate.serve(ctx, req).recover(cause -> {
if (cause instanceof RejectedExecutionException) {
return HttpResponse.of(HttpStatus.TOO_MANY_REQUESTS);
} else {
return HttpResponse.ofFailure(cause);
}
});
}
And then attaching the decorator to your endpoint.
ServerBuilder sb = Server.builder();
sb.serviceUnder("/web", service.decorate(delegate -> new BoundedService(delegate)));
@renaudb I played around a little with your MyBlockingTaskExecutor
(ThreadPoolExecutor) above and RejectedExecutionHandler by injecting the work queue as well as an AtomicInteger counter and massaging their values into the HttpService response. I set the thread count and work queue capacity to be 1(minimum). Looks like the RejectedExecutionHandler never gets invoked and the work queue size remains 0 under a bunch of curl requests (Tried the same load test with com.sun.net.httpserver.HttpServer as a reference point and I can see the RejectedExecutionHandler::rejectedExecution being invoked). But it still process requests and returns response. So maybe ThreadPoolExecutor does not work the same way as in the simple http server I suppose.
By default, Armenia services do not use the blockingTaskExecutor
to
process requests. Can you make sure you are actually making use of it? See:
https://armeria.dev/docs/server-grpc/#blocking-service-implementation
On Fri, Sep 24, 2021, 19:00 Qi Chen @.***> wrote:
@renaudb https://github.com/renaudb I played around a little with your MyBlockingTaskExecutor(ThreadPoolExecutor) above and RejectedExecutionHandler by injecting the work queue as well as an AtomicInteger counter and massaging their values into the HttpService response. I set the thread count and work queue capacity to be 1(minimum). Looks like the RejectedExecutionHandler never gets invoked and the work queue size remains 0 under a bunch of curl requests (Tried the same load test with com.sun.net.httpserver.HttpServer as a reference point and I can see the RejectedExecutionHandler::rejectedExecution being invoked). But it still process requests and returns response. So maybe ThreadPoolExecutor does not work the same way as in the simple http server I suppose.
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/line/armeria/issues/2694#issuecomment-926960764, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAARMOBBOEY3QXW3CK4FQ3TUDT7JZANCNFSM4M3BZTXQ . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub.
@renaudb Thanks for pointing that out! Really helps. Looks like your BoundedService
solution is mostly correct except that the cause when the work queue is full turns out to be RequestTimeoutException
instead of RejectedExecutionException
for some reason, which is probably not ideal, as it mixes case of processing request timeout with the rejected request. On the other hand, the RejectedExecutionHandler does get invoked.
Hi, recently armeria change the type of blockingTaskExecutor from ExecutorService to ScheduledExecutorService, as you known that in JDK the default implementation of ScheduledExecutorService has a unbounded queue and fixed thread size, in some use cases that's unacceptable, we want to control the queue size and the core/max thread size. Is it poassibe revert this change? What's your opinion?
Thanks !