Azure / azure-cosmosdb-java

Java Async SDK for SQL API of Azure Cosmos DB
MIT License
54 stars 61 forks source link

How to read all documents from a collection #54

Closed chetanmeh closed 5 years ago

chetanmeh commented 6 years ago

For some cases we need to read all documents from a collection. For that I tried using AsyncDocumentClient.readDocuments but that failed with UnsupportedOperationException: PartitionKey value must be supplied for this operation.

Caused by: java.lang.UnsupportedOperationException: PartitionKey value must be supplied for this operation.
    at com.microsoft.azure.cosmosdb.rx.internal.RxDocumentClientImpl.addPartitionKeyInformation(RxDocumentClientImpl.java:786)
    at com.microsoft.azure.cosmosdb.rx.internal.RxDocumentClientImpl.lambda$addPartitionKeyInformation$20(RxDocumentClientImpl.java:767)
    at rx.internal.operators.SingleOnSubscribeMap$MapSubscriber.onSuccess(SingleOnSubscribeMap.java:66)
    at rx.internal.util.ScalarSynchronousSingle$2$1.onSuccess(ScalarSynchronousSingle.java:140)
    at rx.internal.operators.OnSubscribeSingle$1.onCompleted(OnSubscribeSingle.java:55)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:656)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.onCompleted(OperatorMerge.java:281)
    at rx.internal.operators.OperatorMapNotification$MapNotificationSubscriber.tryEmit(OperatorMapNotification.java:155)
    at rx.internal.operators.OperatorMapNotification$MapNotificationSubscriber.onCompleted(OperatorMapNotification.java:121)
    at rx.internal.producers.SingleProducer.request(SingleProducer.java:75)
    at rx.internal.operators.OperatorMapNotification$MapNotificationSubscriber.setProducer(OperatorMapNotification.java:136)
    at rx.internal.operators.SingleLiftObservableOperator$WrapSubscriberIntoSingle.onSuccess(SingleLiftObservableOperator.java:76)
    at rx.internal.operators.OnSubscribeSingle$1.onCompleted(OnSubscribeSingle.java:55)
    at rx.internal.operators.NotificationLite.accept(NotificationLite.java:125)
    at rx.internal.operators.CachedObservable$ReplayProducer.replay(CachedObservable.java:403)
    at rx.internal.operators.CachedObservable$ReplayProducer.request(CachedObservable.java:304)
    at rx.Subscriber.setProducer(Subscriber.java:211)
    at rx.internal.operators.CachedObservable$CachedSubscribe.call(CachedObservable.java:244)
    at rx.internal.operators.CachedObservable$CachedSubscribe.call(CachedObservable.java:230)
    at rx.Observable.unsafeSubscribe(Observable.java:10327)
    at rx.internal.operators.OnSubscribeSingle.call(OnSubscribeSingle.java:81)
    at rx.internal.operators.OnSubscribeSingle.call(OnSubscribeSingle.java:27)
    at rx.internal.operators.SingleToObservable.call(SingleToObservable.java:39)
    at rx.internal.operators.SingleToObservable.call(SingleToObservable.java:27)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10327)
    at rx.internal.operators.OnSubscribeSingle.call(OnSubscribeSingle.java:81)
    at rx.internal.operators.OnSubscribeSingle.call(OnSubscribeSingle.java:27)
    at rx.Single.subscribe(Single.java:1979)
    at rx.internal.util.ScalarSynchronousSingle$2.call(ScalarSynchronousSingle.java:144)
    at rx.internal.util.ScalarSynchronousSingle$2.call(ScalarSynchronousSingle.java:124)
    at rx.Single.subscribe(Single.java:1979)
    at rx.internal.operators.SingleOnSubscribeMap.call(SingleOnSubscribeMap.java:45)
    at rx.internal.operators.SingleOnSubscribeMap.call(SingleOnSubscribeMap.java:30)
    at rx.internal.operators.SingleToObservable.call(SingleToObservable.java:39)
    at rx.internal.operators.SingleToObservable.call(SingleToObservable.java:27)
    at rx.Observable.unsafeSubscribe(Observable.java:10327)
    at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
    at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10327)
    at rx.internal.operators.OnSubscribeSingle.call(OnSubscribeSingle.java:81)
    at rx.internal.operators.OnSubscribeSingle.call(OnSubscribeSingle.java:27)
    at rx.Single.subscribe(Single.java:1979)
    at rx.Single$18.call(Single.java:2518)
    at rx.Single$18.call(Single.java:2505)
    at rx.internal.operators.SingleToObservable.call(SingleToObservable.java:39)
    at rx.internal.operators.SingleToObservable.call(SingleToObservable.java:27)
    at rx.Observable.unsafeSubscribe(Observable.java:10327)
    at rx.internal.operators.OnSubscribeRedo$2.call(OnSubscribeRedo.java:273)
    at rx.internal.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue(TrampolineScheduler.java:73)
    at rx.internal.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule(TrampolineScheduler.java:52)
    at rx.internal.operators.OnSubscribeRedo$5.request(OnSubscribeRedo.java:361)
    at rx.Subscriber.setProducer(Subscriber.java:211)
    at rx.internal.operators.OnSubscribeRedo.call(OnSubscribeRedo.java:353)
    at rx.internal.operators.OnSubscribeRedo.call(OnSubscribeRedo.java:47)
    at rx.Observable.unsafeSubscribe(Observable.java:10327)
    at rx.internal.operators.OnSubscribeSingle.call(OnSubscribeSingle.java:81)
    at rx.internal.operators.OnSubscribeSingle.call(OnSubscribeSingle.java:27)
    at rx.internal.operators.SingleToObservable.call(SingleToObservable.java:39)
    at rx.internal.operators.SingleToObservable.call(SingleToObservable.java:27)
    at rx.Observable.unsafeSubscribe(Observable.java:10327)
    at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
    at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
    at rx.Observable.unsafeSubscribe(Observable.java:10327)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
    at rx.Observable.subscribe(Observable.java:10423)
    at rx.Observable.subscribe(Observable.java:10390)
    at rx.observables.AsyncOnSubscribe$AsyncOuterManager.subscribeBufferToObservable(AsyncOnSubscribe.java:627)
    at rx.observables.AsyncOnSubscribe$AsyncOuterManager.onNext(AsyncOnSubscribe.java:591)
    at rx.observables.AsyncOnSubscribe$AsyncOuterManager.onNext(AsyncOnSubscribe.java:356)
    at rx.observers.SerializedObserver.onNext(SerializedObserver.java:91)
    at com.microsoft.azure.cosmosdb.rx.internal.query.Paginator$1.next(Paginator.java:92)
    at com.microsoft.azure.cosmosdb.rx.internal.query.Paginator$1.next(Paginator.java:80)
    at rx.observables.AsyncOnSubscribe$AsyncOuterManager.nextIteration(AsyncOnSubscribe.java:413)
    at rx.observables.AsyncOnSubscribe$AsyncOuterManager.tryEmit(AsyncOnSubscribe.java:534)
    at rx.observables.AsyncOnSubscribe$AsyncOuterManager.request(AsyncOnSubscribe.java:455)
    at rx.Subscriber.setProducer(Subscriber.java:211)
    at rx.observables.AsyncOnSubscribe.call(AsyncOnSubscribe.java:352)
    at rx.observables.AsyncOnSubscribe.call(AsyncOnSubscribe.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10327)
    at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
    at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.Observable.subscribe(Observable.java:10423)
    at rx.Observable.subscribe(Observable.java:10390)
    at rx.internal.reactivestreams.PublisherAdapter.subscribe(PublisherAdapter.java:35)
    at akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary.preStart(ActorGraphInterpreter.scala:117)
    at akka.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:295)
    at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:557)
    at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:679)
    at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:727)
    at akka.actor.Actor$class.aroundPreStart(Actor.scala:528)
    at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:670)
    at akka.actor.ActorCell.create(ActorCell.scala:652)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:523)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:545)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:283)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: com.microsoft.azure.cosmosdb.DocumentCollection.class
    at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:118)
    at rx.internal.operators.SingleOnSubscribeMap$MapSubscriber.onSuccess(SingleOnSubscribeMap.java:70)
    ... 110 more

Then I switched to use a query like SELECT * FROM root r and it seems to work for small dataset I have. Yet to test on larger dataset.

So need to confirm whats the correct and most efficient way to read all documents?

moderakh commented 6 years ago

@chetanmeh seems there is a bug in readFeed for multi partition collection. For now please use the query api as you suggested yourself which should work fine. We will fix readFeed.

chetanmeh commented 6 years ago

@moderakh Thanks for confirming the alternative approach. Would use the SQL approach.

Any guidance on how to speed this up. If I understand the flow a single query would result in "N" number of parallel calls for all "N" partitions where each such call would fetch result in batch of 100. Is that understanding correct? Somehow I see only downloads ~ 200 docs/sec

moderakh commented 6 years ago

@chetanmeh do you mean the execution of query is slow?

If so, can you please provide the sample code for how you are doing query? what are the options, etc?

chetanmeh commented 6 years ago

The code can be found here. It uses the default settings for AsyncDocumentClient. The FeedOptions have enableCrossPartitionQuery enabled. Further it uses Akka Stream to treat query result as a reactive source and then processes and emits the json to a file

As this is a rarely used operation I can tweak settings even if they lead to higher resource consumption.

moderakh commented 6 years ago

Hi @chetanmeh can you please try setting maxDegreeOfParallelism to a positive number and see if that helps.

chetanmeh commented 6 years ago

@moderakh thanks for the pointer. Would give that a try. So far based on this answer it appears that sdk would use that many parallel request. However looking at code usage of FeedOptions#getMaxDegreeOfParallelism I just see x-ms-documentdb-query-parallelizecrosspartitionquery header being set to true but the actual value is not being used. Probably missing some flow here by just looking at code ...

Would try now by changing the code to see if it gives benefit.

moderakh commented 6 years ago

@chetanmeh just setting x-ms-documentdb-query-parallelizecrosspartitionquery will give you the query execution boost.

chetanmeh commented 6 years ago

@moderakh After digging into the flow further it appears that currently ParallelDocumentQueryExecutionContext is using concat operator to merge multiple observables which per this doc perform sequentially

It simply concatenates two sequences. Once the first sequence completes, the second sequence is subscribed to and its values are passed on through to the result sequence

https://github.com/Azure/azure-cosmosdb-java/blob/cc314c1cfda23cd3135bf12ae9682b0905b9c715/sdk/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/ParallelDocumentQueryExecutionContext.java#L155-L159

Replacing this with merge doubled throughput for my code from ~160 doc/s to ~300 doc/s and bottleneck was allocated RSU.

Index: sdk/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/ParallelDocumentQueryExecutionContext.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- sdk/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/ParallelDocumentQueryExecutionContext.java (revision cc314c1cfda23cd3135bf12ae9682b0905b9c715)
+++ sdk/src/main/java/com/microsoft/azure/cosmosdb/rx/internal/query/ParallelDocumentQueryExecutionContext.java (date 1534175443000)
@@ -155,7 +155,7 @@
     public Observable<FeedResponse<T>> drainAsync(int maxPageSize) {
         List<Observable<FeedResponse<T>>> obs = this.documentProducers.stream()
                 .map(dp -> dp.produceAsync().map(dpp -> dpp.pageResult)).collect(Collectors.toList());
-        return Observable.concat(obs).compose(new EmptyPagesFilterTransformer<>(new RequestChargeTracker(), maxPageSize));
+        return Observable.merge(obs).compose(new EmptyPagesFilterTransformer<>(new RequestChargeTracker(), maxPageSize));
     }

     @Override

I think to make full use of parallel query the code should switch to merge operator instead of concat.

Would such a change makes sense?

chetanmeh commented 6 years ago

@moderakh ping

moderakh commented 6 years ago

@chetanmeh sorry for the delay.

There is one cavity in using Observable.merge(.) for this use case: it doesn't respect order. When no explicit "ORDERBY" is specified in the query, we would like to return the results in the same order as partitions (we want the results to be returned in a deterministic order).

I think we need something like this: http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#concat-io.reactivex.ObservableSource-int-

or this one which has support for backpressure: http://reactivex.io/RxJava/javadoc/io/reactivex/Flowable.html#concat-org.reactivestreams.Publisher-int-

These are in rxjava2, If you can find a way to make this work for rxjava1, your PR is welcomed.

chetanmeh commented 6 years ago

we want the results to be returned in a deterministic order

@moderakh Is this a documented behaviour which needs to be supported for compatibility? As in general when query does not have any ORDERBY specified then order of document is not deterministic. Some system return in some "document order" which probably maps to partition order but thats mostly non deterministic and application should not rely on that.

Trying to enforce this order would always require some trade off. May be we have this as a FeedOption setting for new behaviour to be used. As the throughput gain is pretty good with proposed change and for some cases having no inherent order is fine.

chetanmeh commented 6 years ago

@moderakh And thoughts on previous comment?

moderakh commented 6 years ago

my apologies for delayed response @chetanmeh All our other sdks go with this predefined order. So when there is no explicit orderby, all sdks returns the results in the order of partitions. a predefined order also would help on resuming work.

If we were on RxJava2 this would be fairly easy. for rxjava1 we should build it. A PR for doing similar to what http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#concat-io.reactivex.ObservableSource-int- is doing is welcomed 👍

chetanmeh commented 6 years ago

@moderakh Based on linked docs it appears that it would still not perform a concurrent read.

Concat waits to subscribe to each additional Observable that you pass to it until the previous Observable completes.

From what I understand enforcing any partition ordering would require that observables from each partition should be consumed completely. For example if we have 5 partitions P1-P5 and upon query we create 5 observables from those 5 partitions O1-O5. Then to ensure that documents are emitted in partition order first O1 needs to be consumed fully and then only reads from O2 can be performed.

In case of explicit "ORDER BY" reads can be done in parallel from O1-O5 in batches of page size (100 per default) and then results would be merged based on sort criteria.

So to get maximum throughput for non "ORDER BY" case we would need to have an option to remove this partition based ordering. Some apps may be fine to not have deterministic order and would thus gain from higher throughput.

chetanmeh commented 6 years ago

@moderakh Any thoughts on previous comment?

chetanmeh commented 6 years ago

@moderakh Ping ^^

christopheranderson commented 5 years ago

This is fixed since 2.4.0. https://docs.microsoft.com/en-us/azure/cosmos-db/sql-api-sdk-async-java#240