Closed oceansv closed 4 years ago
Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @anfeldma-ms, @kushagraThapar.
@oceansv - seems like in your server, the channel pool is getting closed. Can you please make sure that your app service is not running into ports exhaustion issue ?
Please take a look at this troubleshooting doc: https://docs.microsoft.com/en-us/azure/cosmos-db/troubleshoot-java-sdk-v4-sql#connection-throttling
@kushagraThapar Just to provide more information , this is how our app is working , 1) read message from eventhub, try to store the message in container1 3) If failed for 5 times , store in container2 . Storing in container2 is working but in container1 it is failing . (each and every message)
Storing in container1 is read + upsert whereas in container2 it is just createItem
@kushagraThapar So if throttling is happening atleast some calls should have passed for container1 also storing in container2 is succeeding?
@oceansv thanks for more information, yeah that is a good point. If throttling would have been happening, it should have been more visible across both containers. @David-Noble-at-work - do you have any idea about the above error stack trace ?
@kushagraThapar since direct mode is getting used by default in latest client , this is not related to that ?
@oceansv - We plan to take a look at this soon - I am putting this in our backlog for current sprint. Related issue : https://github.com/Azure/azure-sdk-for-java/issues/11650
@kushagraThapar I have tried now with gateway endpoint , it is working fine.
@oceansv - thanks for the update, I will keep this issue open as our backlog item for now.
@oceansv - we have tried fixing this issue in azure-cosmos release 4.3.2-beta.1 Can you please try the new version and test the fix ?
Hi, we're sending this friendly reminder because we haven't heard back from you in a while. We need more information about this issue to help address it. Please be sure to give us your input within the next 7 days. If we don't hear back from you within 14 days of this comment the issue will be automatically closed. Thank you!
@oceansv This has been fixed in v4.3.2-beta.1
We have a event hub listener which is picking message, fetching details from DB if already exists and modifying and upserting it in cosmos Db with _etag check. The setup is running fine on local where message are getting stored in cosmos DB . But when it is been deployed and getting run in app service , all the message to upsert document are throwing exception mentioned below.
This is how our app is working , 1) read message from eventhub, 2) try to store the message in container1 3) If failed for 5 times , store in container2 . Storing in container2 is working but in container1 it is failing . (each and every message)
Storing in container1 is read + upsert whereas in container2 it is just createItem
Exception trace : "java.lang.IllegalStateException: RntbdServiceEndpoint({\"id\":1,\"isClosed\":true,\"concurrentRequests\":0,\"remoteAddress\":\"cdb-ms-prod-westus1-fd51.documents.azure.com:14066\",\"channelPool\":{\"remoteAddress\":\"cdb-ms-prod-westus1-fd51.documents.azure.com:14066\",\"isClosed\":false,\"configuration\":{\"maxChannels\":130,\"maxRequestsPerChannel\":30,\"idleConnectionTimeout\":0,\"readDelayLimit\":65000000000,\"writeDelayLimit\":10000000000},\"state\":{\"channelsAcquired\":0,\"channelsAvailable\":0,\"requestQueueLength\":0}}}) is closed\r\n\tat com.azure.cosmos.implementation.guava25.base.Preconditions.checkState(Preconditions.java:586)\r\n\tat com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdServiceEndpoint.throwIfClosed(RntbdServiceEndpoint.java:222)\r\n\tat com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdServiceEndpoint.request(RntbdServiceEndpoint.java:175)\r\n\tat com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient.invokeStoreAsync(RntbdTransportClient.java:123)\r\n\tat com.azure.cosmos.implementation.directconnectivity.TransportClient.invokeResourceOperationAsync(TransportClient.java:21)\r\n\tat com.azure.cosmos.implementation.directconnectivity.ConsistencyWriter.lambda$writePrivateAsync$4(ConsistencyWriter.java:164)\r\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118)\r\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782)\r\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144)\r\n\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)\r\n\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)\r\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782)\r\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144)\r\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782)\r\n\tat reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241)\r\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782)\r\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144)\r\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782)\r\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144)\r\n\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73)\r\n\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)\r\n\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)\r\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782)\r\n\tat reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:171)\r\n\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:838)\r\n\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:600)\r\n\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:580)\r\n\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:457)\r\n\tat reactor.core.publisher.DrainUtils.postCompleteDrain(DrainUtils.java:131)\r\n\tat reactor.core.publisher.DrainUtils.postComplete(DrainUtils.java:186)\r\n\tat reactor.core.publisher.FluxMapSignal$FluxMapSignalSubscriber.onComplete(FluxMapSignal.java:213)\r\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1783)\r\n\tat reactor.core.publisher.MonoCacheTime.subscribeOrReturn(MonoCacheTime.java:140)\r\n\tat reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)\r\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)\r\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782)\r\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144)\r\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782)\r\n\tat reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241)\r\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782)\r\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144)\r\n\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73)\r\n\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)\r\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782)\r\n\tat reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:171)\r\n\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:838)\r\n\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:600)\r\n\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:580)\r\n\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:457)\r\n\tat reactor.core.publisher.DrainUtils.postCompleteDrain(DrainUtils.java:131)\r\n\tat reactor.core.publisher.DrainUtils.postComplete(DrainUtils.java:186)\r\n\tat reactor.core.publisher.FluxMapSignal$FluxMapSignalSubscriber.onComplete(FluxMapSignal.java:213)\r\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1783)\r\n\tat reactor.core.publisher.MonoCacheTime.subscribeOrReturn(MonoCacheTime.java:140)\r\n\tat reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)\r\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)\r\n\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)\r\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782)\r\n\tat reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:171)\r\n\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:838)\r\n\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:600)\r\n\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:580)\r\n\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:457)\r\n\tat reactor.core.publisher.DrainUtils.postCompleteDrain(DrainUtils.java:131)\r\n\tat reactor.core.publisher.DrainUtils.postComplete(DrainUtils.java:186)\r\n\tat reactor.core.publisher.FluxMapSignal$FluxMapSignalSubscriber.onComplete(FluxMapSignal.java:213)\r\n\tat reactor.core.publisher.FluxJust$WeakScalarSubscription.request(FluxJust.java:101)\r\n\tat reactor.core.publisher.FluxMapSignal$FluxMapSignalSubscriber.request(FluxMapSignal.java:225)\r\n\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:363)\r\n\tat reactor.core.publisher.FluxMapSignal$FluxMapSignalSubscriber.onSubscribe(FluxMapSignal.java:115)\r\n\tat reactor.core.publisher.FluxJust.subscribe(FluxJust.java:70)\r\n\tat reactor.core.publisher.Mono.subscribe(Mono.java:4219)\r\n\tat reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:199)\r\n\tat reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)\r\n\tat reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)\r\n\tat reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)\r\n\tat reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)\r\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)\r\n\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)\r\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782)\r\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144)\r\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782)\r\n\tat reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:171)\r\n\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:838)\r\n\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:600)\r\n\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:580)\r\n\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:457)\r\n\tat reactor.core.publisher.DrainUtils.postCompleteDrain(DrainUtils.java:131)\r\n\tat reactor.core.publisher.DrainUtils.postComplete(DrainUtils.java:186)\r\n\tat reactor.core.publisher.FluxMapSignal$FluxMapSignalSubscriber.onComplete(FluxMapSignal.java:213)\r\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1783)\r\n\tat reactor.core.publisher.MonoCacheTime.subscribeOrReturn(MonoCacheTime.java:140)\r\n\tat reactor.core.publisher.Mono.subscribe(Mono.java:4204)\r\n\tat reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:199)\r\n\tat reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)\r\n\tat reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)\r\n\tat reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)\r\n\tat reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:79)\r\n\tat reactor.core.publisher.MonoRetryWhen.subscribeOrReturn(MonoRetryWhen.java:46)\r\n\tat reactor.core.publisher.Mono.subscribe(Mono.java:4204)\r\n\tat reactor.core.publisher.Mono.block(Mono.java:1678)\r\n\tat com.azure.cosmos.CosmosContainer.blockItemResponse(CosmosContainer.java:210)\r\n\tat com.azure.cosmos.CosmosContainer.upsertItem(CosmosContainer.java:199)\r\n\tat com.kcc.control_tower.repos.BaseRepository.upsertItemWithOptimisticLockCheck(BaseRepository.java:139)\r\n\tat com.kcc.control_tower.repos.BaseRepository.upsertIfLatest(BaseRepository.java:50)\r\n\tat com.kcc.control_tower.repos.ShipmentRepositoryImpl.upsertIfLatest(ShipmentRepositoryImpl.java:28)\r\n\tat com.kcc.control_tower.services.ShipmentServiceImpl.updateShipmentEntitiy(ShipmentServiceImpl.java:22)\r\n\tat com.kcc.control_tower.listener.ShipmentTmsEventHubListener.lambda$new$0(ShipmentTmsEventHubListener.java:123)\r\n\tat org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)\r\n\tat org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:209)\r\n\tat com.kcc.control_tower.listener.ShipmentTmsEventHubListener.lambda$new$1(ShipmentTmsEventHubListener.java:122)\r\n\tat com.azure.messaging.eventhubs.EventProcessorClientBuilder$1.processEvent(EventProcessorClientBuilder.java:434)\r\n\tat com.azure.messaging.eventhubs.PartitionPumpManager.processEvent(PartitionPumpManager.java:217)\r\n\tat com.azure.messaging.eventhubs.PartitionPumpManager.processEvents(PartitionPumpManager.java:250)\r\n\tat com.azure.messaging.eventhubs.PartitionPumpManager.lambda$startPartitionPump$1(PartitionPumpManager.java:182)\r\n\tat reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)\r\n\tat reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:439)\r\n\tat reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:526)\r\n\tat reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)\r\n\tat reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)\r\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\r\n\tat java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)\r\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\r\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\r\n\tat java.base/java.lang.Thread.run(Thread.java:834)\r\n\tSuppressed: java.lang.Exception: #block terminated with an error\r\n\t\tat reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)\r\n\t\tat reactor.core.publisher.Mono.block(Mono.java:1679)\r\n\t\t... 24 more\r\n",
Code Snippet Java sdk for cosmos
Cosmos Client creation as a singleton , database and cosmos client are also created as singleton bean in spring boot app.
CosmosClient cosmosClient = new CosmosClientBuilder() .endpoint(azureCosmosdbUri) .key(azureCosmosdbKey) .consistencyLevel(ConsistencyLevel.EVENTUAL) .buildClient();
what is this exception and how to resolve it?