Closed artem-v closed 6 years ago
Line of interest is
at reactor.core.scheduler.Schedulers.workerSchedule(Schedulers.java:708) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
When one calls Schedulers.fromExecutorService(exexcutorService)
he creates DelegateServiceScheduler
instance, which has method :
@Override
public Worker createWorker() {
return new ExecutorServiceWorker(executor);
}
By turn, ExecutorServiceWorker
has field Composite tasks = Disposables.composite()
,. So here it goes: in workerSchedule
method we see following:
WorkerTask sr = new WorkerTask(task, tasks);
if (!tasks.add(sr)) {
throw Exceptions.failWithRejected();
}
If look at .add method which by turn would lead to .addEntry method we can see following:
boolean addEntry(Disposable value) {
final Disposable[] a = disposables;
final int m = mask;
int pos = mix(value.hashCode()) & m;
Disposable curr = a[pos];
if (curr != null) {
if (curr.equals(value)) {
return false;
}
for (;;) {
pos = (pos + 1) & m;
curr = a[pos];
if (curr == null) {
break;
}
if (curr.equals(value)) {
return false;
}
}
}
a[pos] = value;
if (++size >= maxSize) {
rehash();
}
return true;
}
So, looks like return false;
block only called when curr.equals(value)
.
To conclude all above it appears that we can't schedule same task again and again and again to the scheduler which was created via reactor.core.scheduler.Schedulers#fromExecutorService
.
To fix all of this the the proposition is to simply not create executor service and wrap it to scheduler but just go straight with one of it factory methods on Schdulers class itself: for example newSingle().
Reopening. 3.2.1 Did not fix the problem.
E 2018-10-29T13:06:06,274 r.c.p.Operators Operator called default onErrorDropped [sc-io-5-1]reactor.core.Exceptions$ReactorRejectedExecutionException: Scheduler unavailable
at reactor.core.Exceptions.failWithRejected(Exceptions.java:249) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]
at reactor.core.publisher.Operators.onRejectedExecution(Operators.java:799) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]
at reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.trySchedule(FluxPublishOn.java:763) [reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]
at reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.onNext(FluxPublishOn.java:691) [reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]
at reactor.core.publisher.FluxOnBackpressureBuffer$BackpressureBufferSubscriber.drainFused(FluxOnBackpressureBuffer.java:279) [reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]
at reactor.core.publisher.FluxOnBackpressureBuffer$BackpressureBufferSubscriber.drain(FluxOnBackpressureBuffer.java:203) [reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]
at reactor.core.publisher.FluxOnBackpressureBuffer$BackpressureBufferSubscriber.onNext(FluxOnBackpressureBuffer.java:168) [reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]
at reactor.core.publisher.DirectProcessor$DirectInner.onNext(DirectProcessor.java:333) [reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]
at reactor.core.publisher.DirectProcessor.onNext(DirectProcessor.java:142) [reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]
at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:89) [reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]
at reactor.core.publisher.DelegateProcessor.onNext(DelegateProcessor.java:64) [reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]
at reactor.core.publisher.FluxCreate$IgnoreSink.next(FluxCreate.java:593) [reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]
at reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:151) [reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]
at io.scalecube.transport.MessageHandler.channelRead(MessageHandler.java:33) [scalecube-transport-2.1.2.jar:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.29.Final.jar:4.1.29.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.29.Final.jar:4.1.29.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.29.Final.jar:4.1.29.Final]
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) [netty-codec-4.1.29.Final.jar:4.1.29.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.29.Final.jar:4.1.29.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.29.Final.jar:4.1.29.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.29.Final.jar:4.1.29.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310) [netty-codec-4.1.29.Final.jar:4.1.29.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284) [netty-codec-4.1.29.Final.jar:4.1.29.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.29.Final.jar:4.1.29.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.29.Final.jar:4.1.29.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.29.Final.jar:4.1.29.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) [netty-transport-4.1.29.Final.jar:4.1.29.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.29.Final.jar:4.1.29.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.29.Final.jar:4.1.29.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) [netty-transport-4.1.29.Final.jar:4.1.29.Final]
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:808) [netty-transport-native-epoll-4.1.29.Final-linux-x86_64.jar:4.1.29.Final]
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:410) [netty-transport-native-epoll-4.1.29.Final-linux-x86_64.jar:4.1.29.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:310) [netty-transport-native-epoll-4.1.29.Final-linux-x86_64.jar:4.1.29.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) [netty-common-4.1.29.Final.jar:4.1.29.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.29.Final.jar:4.1.29.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
Caused by: java.util.concurrent.RejectedExecutionException: Scheduler unavailable
at reactor.core.Exceptions.<clinit>(Exceptions.java:502) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]
at reactor.core.scheduler.Schedulers.workerSchedule(Schedulers.java:709) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]
at reactor.core.scheduler.ExecutorServiceWorker.schedule(ExecutorServiceWorker.java:43) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE]
at reactor.core.publisher.FluxPublishOn$PublishOnConditionalSubscriber.trySchedule(FluxPublishOn.java:759) ~[reactor-core-3.2.1.RELEASE.jar:3.2.1.RELEASE] ... 33 more
Just recently were about running c5.2xlarge perf test with 1 client 1 gw and 1 service. Test didn't start and failed with following in logs.
On the client:
On the gateway:
On the service: