tensorchord / pgvecto.rs

Scalable, Low-latency and Hybrid-enabled Vector Search in Postgres. Revolutionize Vector Search, not Database.
https://docs.pgvecto.rs/getting-started/overview.html
Apache License 2.0
1.75k stars 71 forks source link

research: compatibility with citus #577

Closed VoVAllen closed 1 month ago

VoVAllen commented 2 months ago

Research if we could support citus

gaocegege commented 2 months ago

cc @cho-thinkfree-com

shetao2015 commented 2 months ago

+1

cutecutecat commented 2 months ago

Stream/Physical replication

Current situation

The main problem is at building the index, when index log is replayed inside PG of standby instance:

postgres=# select * from pg_indexes where tablename = 't';
 schemaname | tablename | indexname | tablespace |                                               indexdef                                                
------------+-----------+-----------+------------+-------------------------------------------------------------------------------------------------------
 public     | t         | t_val_idx |            | CREATE INDEX t_val_idx ON public.t USING vectors (val vector_l2_ops) WITH (options='[indexing.hnsw]')
(1 row)

but the backend hook ambuild is not triggered at WAL replay of standby instance. So they are no longer in sync, which lead to this error at any vector query:

postgres=# SELECT idx_status, idx_indexing FROM pg_vector_index_stat;
ERROR:  pgvecto.rs: The index is not existing in the background worker.
ADVICE: Drop or rebuild the index.

Implement

We need to write a custom WAL manager like this PR.

These operations should be logged:

graph LR
A[ambuild<br>main] --> B[XLogBeginInsert<br>main]
B[XLogBeginInsert<br>main] -->|auto| C[Write custom WAL<br>main]
C[Write custom WAL<br>main] -->|publish| D[standby]
E[standby] -->|hook| F[decode & replay<br>standby]
F[decode & replay<br>standby] --> G[rpc.create<br>standby]

ref:

Limitation

cutecutecat commented 2 months ago

Citus

🎉Citus is basically compatible with PGVecto.rs at the verification.

graph LR
1[items] --> 2[items_102016]
1[items] --> 3[items_102017]
1[items] --> 4[items_102018]
1[items] --> 5[items_102019]
2[items_102016] --> 6[items_embedding_idx_102016]
3[items_102017] --> 7[items_embedding_idx_102017]
4[items_102018] --> 8[items_embedding_idx_102018]
5[items_102019] --> 9[items_embedding_idx_102019]
6[items_embedding_idx_102016] --> 10[items_embedding_idx]
7[items_embedding_idx_102017] --> 10[items_embedding_idx]
8[items_embedding_idx_102018] --> 10[items_embedding_idx]
9[items_embedding_idx_102019] --> 10[items_embedding_idx]

Checklist

root@7b2d6e048a03:/# cat /var/lib/postgresql/data/pg_vectors/indexes/0000000000000000000000000000000066d6b9f5b04760260000000500004552/sealed_segments/1/storage/len ; echo 27899

root@7b2d6e048a03:/# cat /var/lib/postgresql/data/pg_vectors/indexes/0000000000000000000000000000000066d6b9f5b04760260000000500004551/sealed_segments/1/storage/len ; echo 22061

- [x] No loss in vector query accuracy
recall = 0.8504 at `glove+cos+hnsw` regardless of citus on/off

- [x] Citus feature: single node
- [x] Citus feature: multi node

```shell
postgres=# EXPLAIN SELECT id FROM items ORDER BY embedding <=> '[1,1,1]' LIMIT 100;
                                                                QUERY PLAN                                                                
------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=3821.93..3822.18 rows=100 width=12)
   ->  Sort  (cost=3821.93..4071.93 rows=100000 width=12)
         Sort Key: remote_scan.worker_column_2
         ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=100000 width=12)
               Task Count: 4
               Tasks Shown: All
               ->  Task
                     Node: host=172.18.0.1 port=5430 dbname=postgres
                     ->  Limit  (cost=0.00..2.10 rows=100 width=12)
                           ->  Index Scan using items_embedding_idx_102028 on items_102028 items  (cost=0.00..501.98 rows=23918 width=12)
                                 Order By: (embedding <=> '[1, 1, 1]'::vector)
               ->  Task
                     Node: host=172.18.0.1 port=5431 dbname=postgres
                     ->  Limit  (cost=0.00..2.10 rows=100 width=12)
                           ->  Index Scan using items_embedding_idx_102029 on items_102029 items  (cost=0.00..545.11 rows=26009 width=12)
                                 Order By: (embedding <=> '[1, 1, 1]'::vector)
               ->  Task
                     Node: host=172.18.0.1 port=5430 dbname=postgres
                     ->  Limit  (cost=0.00..2.10 rows=100 width=12)
                           ->  Index Scan using items_embedding_idx_102030 on items_102030 items  (cost=0.00..464.84 rows=22147 width=12)
                                 Order By: (embedding <=> '[1, 1, 1]'::vector)
               ->  Task
                     Node: host=172.18.0.1 port=5431 dbname=postgres
                     ->  Limit  (cost=0.00..2.10 rows=100 width=12)
                           ->  Index Scan using items_embedding_idx_102031 on items_102031 items  (cost=0.00..585.08 rows=27926 width=12)
                                 Order By: (embedding <=> '[1, 1, 1]'::vector)
(26 rows)

Limitation

gaocegege commented 2 months ago

Thanks for your research!