qdrant / qdrant

Qdrant - High-performance, massive-scale Vector Database for the next generation of AI. Also available in the cloud https://cloud.qdrant.io/
https://qdrant.tech
Apache License 2.0
20.2k stars 1.37k forks source link

3-node cluster with replication doesn't handle downtime of single node #5215

Open Michael-JB opened 1 week ago

Michael-JB commented 1 week ago

Current Behavior

Hi! I'm setting up a 3-node Qdrant cluster in k8s using the Qdrant helm chart. My deployment is not behaving as I expect (or as documented).

The deployment

Config Value
Cluster size 3
Shards per collection 12
Collection replication factor 3
Write consistency factor 2
Query read consistency 1

I've confirmed that these values are set correctly by querying the /cluster and /collections/:collection_name/cluster endpoints (see below). I see 12 shards as expected, and that these shards are evenly distributed amongst the nodes. I also see that the replication factor is set to 3 in the collection config, although I've found no way to confirm that these replicas actually exist via the API.

The problem

When I kill a node in the Qdrant cluster (kill a pod managed by the k8s sts), all search queries sent to the Qdrant service return a 500 until the node recovers. The other Qdrant nodes repeatedly log the following while the node is down:

2024-10-10T14:11:07.426281Z ERROR qdrant::actix::helpers: Error processing request: Service internal error: 1 of 1 read operations failed:  Service internal error: Tonic status error: status: Unavailable, message: "Failed to connect to http://qdrant-0.qdrant-headless:6335/, error: transport error", details: [], metadata: MetadataMap { headers: {} }

It's as if the collection shards are not replicated across the cluster.

Expected Behavior

Per the documentation here, I would expect all requests to work while the node is temporarily down.

As a side note, it would be nice to have an API to locate shard replicas.

Context (Environment)

My goal is a HA deployment such that Qdrant remains available (1) during upgrades; and (2) during temporary node failures.

Detailed Description

Qdrant version: v1.12.0

GET /collections/my_collection/cluster:

{
    "result": {
        "peer_id": 5059350541336290,
        "shard_count": 12,
        "local_shards": [
            {
                "shard_id": 2,
                "points_count": 833,
                "state": "Active"
            },
            {
                "shard_id": 5,
                "points_count": 1088,
                "state": "Active"
            },
            {
                "shard_id": 8,
                "points_count": 990,
                "state": "Active"
            },
            {
                "shard_id": 11,
                "points_count": 1279,
                "state": "Active"
            }
        ],
        "remote_shards": [
            {
                "shard_id": 0,
                "peer_id": 906069340222619,
                "state": "Active"
            },
            {
                "shard_id": 1,
                "peer_id": 6606434237776271,
                "state": "Active"
            },
            {
                "shard_id": 3,
                "peer_id": 906069340222619,
                "state": "Active"
            },
            {
                "shard_id": 4,
                "peer_id": 6606434237776271,
                "state": "Active"
            },
            {
                "shard_id": 6,
                "peer_id": 906069340222619,
                "state": "Active"
            },
            {
                "shard_id": 7,
                "peer_id": 6606434237776271,
                "state": "Active"
            },
            {
                "shard_id": 9,
                "peer_id": 906069340222619,
                "state": "Active"
            },
            {
                "shard_id": 10,
                "peer_id": 6606434237776271,
                "state": "Active"
            }
        ],
        "shard_transfers": []
    },
    "status": "ok",
    "time": 0.000054771
}

GET /cluster:

{
    "result": {
        "status": "enabled",
        "peer_id": 5059350541336290,
        "peers": {
            "6606434237776271": {
                "uri": "http://qdrant-2.qdrant-headless:6335/"
            },
            "906069340222619": {
                "uri": "http://qdrant-1.qdrant-headless:6335/"
            },
            "5059350541336290": {
                "uri": "http://qdrant-0.qdrant-headless:6335/"
            }
        },
        "raft_info": {
            "term": 264,
            "commit": 188,
            "pending_operations": 0,
            "leader": 906069340222619,
            "role": "Follower",
            "is_voter": true
        },
        "consensus_thread_status": {
            "consensus_thread_status": "working",
            "last_update": "2024-10-10T08:44:00.261119661Z"
        },
        "message_send_failures": {}
    },
    "status": "ok",
    "time": 0.000029061
}

GET /collections/my_collection:

{
    "status": "green",
    "optimizer_status": "ok",
    "indexed_vectors_count": 0,
    "points_count": 12334,
    "segments_count": 24,
    "config": {
        "params": {
            "vectors": {
                "size": 128,
                "distance": "Cosine"
            },
            "shard_number": 12,
            "replication_factor": 3,
            "write_consistency_factor": 2,
            "on_disk_payload": true,
            "sparse_vectors": {
                "bm25": {
                    "modifier": "idf"
                }
            }
        },
        "hnsw_config": {
            "m": 16,
            "ef_construct": 100,
            "full_scan_threshold": 10000,
            "max_indexing_threads": 0,
            "on_disk": false
        },
        "optimizer_config": {
            "deleted_threshold": 0.2,
            "vacuum_min_vector_number": 1000,
            "default_segment_number": 0,
            "max_segment_size": null,
            "memmap_threshold": null,
            "indexing_threshold": 20000,
            "flush_interval_sec": 5,
            "max_optimization_threads": null
        },
        "wal_config": {
            "wal_capacity_mb": 32,
            "wal_segments_ahead": 0
        },
        "quantization_config": null,
        "strict_mode_config": {
            "enabled": false
        }
    },
    "payload_schema": {
        <my_payload_schema>
    }
}
generall commented 1 week ago

According to your GET /collections/my_collection/cluster, it is not replicated. If you changed replication factor after collection is already created, you need to follow this https://qdrant.tech/documentation/guides/distributed_deployment/#creating-new-shard-replicas

Michael-JB commented 1 week ago

Hi @generall, thank you for the quick reply.

To clarify, I am not changing the replication factor after creating the collection. This configuration is set on a fresh installation (no existing collections). I then create a collection, add points, and observe these responses. I configure the replication_factor via the helm values if that's of any significance.

If the shards are replicated, would I expect to see all shards under the local_shards key the response to GET /collections/my_collection/cluster?

generall commented 1 week ago

If the shards are replicated, would I expect to see all shards under the local_shards key the response to GET /collections/my_collection/cluster?

yes

I configure the replication_factor via the helm values if that's of any significance.

I am not sure if the helm chart replicaCount is the same thing as replication factor in the collection.

Michael-JB commented 1 week ago

@generall To be more precise, I set config.storage.collection.replication_factor in the helm chart, which overrides this value in the Qdrant config (via production.yaml). I then rely on this to set replication_factor when creating the collection, rather than explicitly specifying replication_factor in the collection create request. This seems to work as the replication_factor I specify propagates to the config returned by GET /collections/my_collection, but perhaps this value is misleading. Could it be the case that setting replication_factor via config behaves differently to setting it in the create request body?

Michael-JB commented 1 week ago

Hi @generall, I created a test for this in a local sandbox and can confirm that my above suspicions hold -- this looks like a bug. TL;DR:

If you configure replication_factor via the global config, new collections pick up this configuration but do not actually create shard replicas. If you explicitly specify replication_factor in the collection create request, everything works as expected.

Here is a full repro:

  1. Install kind and create new cluster: kind create cluster
  2. helm repo add qdrant https://qdrant.github.io/qdrant-helm/ && helm repo update
  3. Create a values.yaml file containing:
    # Create cluster with 2 Qdrant nodes
    replicaCount: 2
    config:
    cluster:
    enabled: true
    # Set default collection replication factor to 2
    storage:
    collection:
      replication_factor: 2
  4. Install Qdrant in cluster: helm install qdrant qdrant/qdrant -f values.yaml
  5. Port-forward Qdrant svc to localhost:8002: kubectl port-forward service/qdrant 8002:6333
  6. Run the following python script:
    
    import numpy as np
    from qdrant_client import QdrantClient
    from qdrant_client.models import VectorParams, Distance, PointStruct

client = QdrantClient(host="localhost", port=8002)

collections = ["collection_config_implicit", "collection_config_explicit"]

print("Creating collections...") client.create_collection( collection_name=collections[0], # replication config implicit vectors_config=VectorParams(size=128, distance=Distance.COSINE), shard_number=2, ) client.create_collection( collection_name=collections[1], # replication config explicit vectors_config=VectorParams(size=128, distance=Distance.COSINE), shard_number=2, replication_factor=2, )

print("Uploading vectors...") for collection_name in collections: vectors = np.random.rand(100, 128) client.upsert( collection_name, points=[ PointStruct( id=idx, vector=vector.tolist(), ) for idx, vector in enumerate(vectors) ], )

print("Running tests...")

We expect that both collections have a replication factor of 2

This assertion holds.

collection_replication_factors = [ client.get_collection(collection_name).config.params.replication_factor for collection_name in collections ] assert collection_replication_factors == [2, 2]

We expect that both collections have two local shards (one primary, one replica)

This assertion fails as collection_config_implicit has only one local shard;

This demonstrates that replication doesn't happen if configured via global config.

collection_cluster_infos = [ client.http.cluster_api.collection_cluster_info(collection_name).result for collection_name in collections ] local_shard_counts = [ len(info.local_shards) if info else -1 for info in collection_cluster_infos ] assert local_shard_counts == [2, 2], f"Expected [2, 2], got {local_shard_counts}"



Things to note:
1. If this is a bug for `replication_factor`, this may extend to other fields configured via global config, e.g., `write_consistency_factor`. I haven't tested this.
2. I used the helm chart in this repo as it's a convenient way to spin up a Qdrant cluster. The Qdrant chart sets `replication_factor` in the `production.yaml` config file. I expect this bug is internal to Qdrant, i.e., you'll still see this if you're not using the helm chart/k8s.