Azure / azure-sdk-for-python

This repository is for active development of the Azure SDK for Python. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/python/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-python.
MIT License
4.39k stars 2.72k forks source link

Cosmos DB - query_items_change_feed only processes first partition then stops #34922

Open kovarikthomas opened 4 months ago

kovarikthomas commented 4 months ago

Describe the bug Current implementation of query_items_change_feed method returns all documents from the first partition and then it stops processing. It does not automatically advance to next partition.

To Reproduce

  1. Provide account URL and key
  2. Run below snippet. It fails on last assertion statement.
import asyncio
import uuid
from azure.cosmos.aio import CosmosClient
from azure.cosmos.partition_key import PartitionKey
from azure.cosmos import ThroughputProperties

async def main(account_url, account_key):

    client = CosmosClient(account_url, account_key)
    async with client:

        # Create random db and container with 2 partitions. Names can be fixed for subsequent tests.
        db = await client.create_database_if_not_exists(id=str(uuid.uuid4())) 
        coll = await db.create_container_if_not_exists(
            id=str(uuid.uuid4()),
            partition_key=PartitionKey(path='/pk', kind='Hash'), 
            offer_throughput=ThroughputProperties(auto_scale_max_throughput=20000, auto_scale_increment_percent=0)
        )

        # Ensure we have 2 partitions.
        pks = []
        async for pk in coll.client_connection._ReadPartitionKeyRanges(coll.container_link):
            pks.append(pk)

        assert len(pks) == 2, f'Expected 2 partitions, found: {len(pks)}.'

        # Define docs to insert. 2 distinct logical partition keys.
        item1 = {'pk': 'pk1', 'id': 'id1'}
        item2 = {'pk': 'pk1', 'id': 'id2'}
        item3 = {'pk': 'pk1', 'id': 'id3'}
        item4 = {'pk': 'pk2', 'id': 'id1'}

        # Insert docs to db.
        await asyncio.gather(
            coll.create_item(item1), 
            coll.create_item(item2), 
            coll.create_item(item3),
            coll.create_item(item4),
        )

        # We've created 4 items with 2 distinct partition keys, let's confirm they landed on different physical partitions.
        items_partition0, items_partition1 = [], []
        async for item in coll.query_items(query='SELECT * FROM c', partition_key_range_id=pks[0]['id']):
            items_partition0.append(item)
        async for item in coll.query_items(query='SELECT * FROM c', partition_key_range_id=pks[1]['id']):
            items_partition1.append(item)
        assert len(items_partition0) == 3 and len(items_partition1) == 1, \
            f'Expected 3 documents on partiton 0 and \
            1 document on partition 1, found: {len(items_partition0)} documents \
            on partition 0 and {len(items_partition1)} on partition 1.'

        # Now that we've confirmed the data layout, let's read change feed for both partitions together and for each separately.
        items_cf_partition0, items_cf_partition1, items_cf_all = [], [], []
        async for item in coll.query_items_change_feed(is_start_from_beginning=True, partition_key_range_id=pks[0]['id']):
            items_cf_partition0.append(item)
        async for item in coll.query_items_change_feed(is_start_from_beginning=True, partition_key_range_id=pks[1]['id']):
            items_cf_partition1.append(item)
        async for item in coll.query_items_change_feed(is_start_from_beginning=True):
            items_cf_all.append(item)

        assert len(items_cf_partition0) == 3 and len(items_cf_partition1) == 1, \
            f'Expected 3 documents on partiton 0 and \
            1 document on partition 1, found: {len(items_partition0)} documents \
            on partition 0 and {len(items_partition1)} on partition 1.'

        assert len(items_cf_all) == 4, f'Expected 4 documents, found: {len(items_cf_all)}.'

if __name__ == '__main__':

    account_url = ''
    account_key = ''

    asyncio.run(main(account_url, account_key))

Expected behavior I would expect it to automatically pick up on the next partition and continue enumerating documents until the very last partition is processed. In the absence of automatic advancement to next available partition, please provide guidance on how to manually "nudge" it to keep processing.

Screenshots N/A

Additional context Discussed with @simorenoh offline.

github-actions[bot] commented 4 months ago

Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @AbhinavTrips @bambriz @pjohari-ms @simorenoh.

kushagraThapar commented 1 month ago

@kovarikthomas - FYI, just letting you know, this is currently being worked on.