elastic / connectors

Source code for all Elastic connectors, developed by the Search team at Elastic, and home of our Python connector development framework
https://www.elastic.co/guide/en/enterprise-search/master/index.html
Other
58 stars 116 forks source link

[Framework] Loading all ids in mem during a sync can cause OOMs #2632

Open jedrazb opened 2 weeks ago

jedrazb commented 2 weeks ago

Bug Description

Right now, we load all (id, timestamp) pairs into memory when executing a content sync.

This is used in the ES sink logic, to clean the ES index of documents that were not found in the current sync.

In the comment of yield_existing_documents_metadata we have that 300,000 ids will be around 50MiB. It's "fine" for small datasets, but as we approach 100,000,000 docs scale all of the sudden we need 10+GB of RAM to keep all ids in memory - this does't scale well.

To Reproduce

Steps to reproduce the behavior:

  1. Setup a source with 100,000,000+ docs
  2. First sync should run successfully
  3. Make sure your connectors service runs with constrained available RAM (to show an example)
  4. The service will fail when trying to load all docs into memory

Expected behavior

Connector service should be able to operate on billion document scale.

Additional context

One workaround for now would be to run connector service in a self-managed environment (not Elastic cloud) and provision the service with enough RAM if needed.

Todo

artem-shelkovnikov commented 2 weeks ago

One old proposal was to have a small local file DB to store ids: https://github.com/elastic/connectors/issues/748

jedrazb commented 2 weeks ago

That looks very promising! Great that we have some handy benchmarks ready https://github.com/tarekziade/bench-db! I'm checking how would rocksDB perform against them, Flink uses rocksDB to store internal state

EDIT: python-rocksdb looks like a dead project (gh), last release 5 yrs ago

jedrazb commented 2 weeks ago

From #748

I benched dbm and sqlite3 for 1 Million docs -- dbm wins see https://github.com/tarekziade/bench-db

I forked the bench repo and change the benchmark scenario from 1Mil docs to something more extreme. I only considered the stdlib dbs which were winners in previous benchmark. Result is:

100 Mil docs

sqlite3 (ingest 1m 46sec, 3.6GB disc)

Adding 100 million ids and timestamps
took 106.8999559879303 seconds
Final count 100000000
max id len is 12
Looking for one id
0.004922542022541165 seconds
Size of DB on disk is 3636.70703125 MiB

dbm (ingest 2+ hrs, couple of GBs)

after 10 mins it didn't complete! It was until couple of million docs and then it slowed down massively. I used tqdm to track progress, and projected time to load 100Mil ids is 2+ hrs. We shouldn't use it!

1Bil docs

sqlite3 (ingest time 18min 40s, 37GB disc)

Adding 1_000 million ids and timestamps
took 1120.5809767246246 seconds
Final count 1000000000
max id len is 13
Looking for one id
0.005293749971315265 seconds
Size of DB on disk is 37454.15625 MiB

~2mins for ingesting 100Mil and ~20min for 1Bil docs should be acceptable IMO. also this gives us ability to not change existing logic in the connector service. sqlite3 performs well on the scale we are interested in

Pros

Cons

jedrazb commented 2 weeks ago

Another option: https://github.com/elastic/search-team/issues/4712

Scaling full sync with delete by query

For a full sync, instead of pre-loading all existing documents, we could add a timestamp field to indicate the index time of each document, and make a Delete by query API call to delete all documents with the timestamp older than the sync start time.

Pros

Cons

artem-shelkovnikov commented 1 week ago

Great research, @jedrazb!

It's concerning to see that we need so much disk and time to store the ids for 200M records - but makes sense that it's just RAM we need + some overhead. Main concern is disk cleanup cause we can just drop down a host if we overuse disk and fail to clean up.

jedrazb commented 1 week ago

Main concern is disk cleanup cause we can just drop down a host if we overuse disk and fail to clean up.

Agreed, we would need to establish a strategy for an effective cleanup of temporary db files that store existing ids for any given sync. Especially in case of some unexpected failures.

Also, another concern is how quickly we can retrieve results from the local db

Looking for one id
0.004922542022541165 seconds

Above benchmark seems VERY slow to me. I validated that the reason for this is that in the benchmark code we have timeit.timeit(_lookup, number=1000) which reports the total time to retrieve it 1000x times.

I timed retrieving 1Mil random docs and it took 10.17s on my local machine so it seems to be fine.

jedrazb commented 1 week ago

@seanstory, following up on our discussion yesterday in chat. We can skip the document lookup/database altogether if we add a timestamp field to indicate the index time of each document.

It could work as follows:

Naive Incremental Syncs

Assumption: After the last successful naive incremental (full sync with skip_unchanged_docs=True) or full sync, the state of the index is up-to-date with the data source.

Here's the proposed process:

  1. Find the last successful content sync start time for a given connector (from the jobs index). Let's call it last_successful_sync_started_timestamp.
  2. For each document:
    • Download the content only if the last modified timestamp is greater than last_successful_sync_started_timestamp. This means the document needs updating. (We compare against the sync start time intentionally, as we can't really know when the doc was indexed exactly without querying ES)
    • Update the last index (sync) time timestamp in the data index, to indicate that this doc still exists —this is necessary for the purge step.

Purge Outdated Documents

As suggested in this issue:

Add a timestamp field to indicate the index time of each document. Then, make a Delete by query API call to delete all documents with the timestamp older than the sync start time.

It seems like we could save a lot of mem/disk without querying ES that much. My only concern is how _delete_by_query would work on 100Mil+ index size scale :D


Note, I'm not talking here about "proper" incremental syncs with delta API (only SPO connector ATM), those don't load existing docs and don't purge any, since they are based on delta API only: source

artem-shelkovnikov commented 1 week ago

Find the last successful content sync start time for a given connector (from the jobs index). Let's call it last_successful_sync_started_timestamp.

We can actually use cursors for it, it's already available and from first glance look fit for our goal (cursors are updated after successful sync).

My only concern is how _delete_by_query would work on 100Mil+ index size scale :D

Oh it's fun, it can crush clusters. Worst case we can emulate on our side by just querying documents manually and deleting them in batches.

Anyway I'd give _delete_by_query a try first and see how it goes.

jedrazb commented 1 week ago

We can actually use cursors for it, it's already available and from first glance look fit for our goal (cursors are updated after successful sync).

++ yeah, cursors could be a great fit! Just want to emphasize that we should update the cursor to the sync_starts timestamp instead of timestamp at the end of sync. This covers the case of a long running sync where a doc update (in source) happens during a running sync, and the subsequent sync can therefore miss that update.

artem-shelkovnikov commented 1 week ago

I would personally not rely on timestamps and maybe add a metafield that refers document to a sync id that it was part of?

Then we delete all docs that have different sync id?

jedrazb commented 1 week ago

I would personally not rely on timestamps and maybe add a metafield that refers document to a sync id that it was part of? Then we delete all docs that have different sync id?

Agreed that for the delete phase, it's a solid approach, less susceptible to any potential timezone conversion issues!

Just want to emphasize that we should update the cursor to the sync_starts timestamp instead of timestamp at the end of sync.

My comment was about incremental syncs and skipping unnecessary downloads. In this case, we need to rely on timestamps from the source to decide whether or not downloading a document is necessary. We need to compare timestamps to determine if a document has been modified recently.

artem-shelkovnikov commented 1 week ago

Sorry I'm a bit confused - do you also mean dropping timestamp from individual documents?

jedrazb commented 1 week ago

Sorry I'm a bit confused - do you also mean dropping timestamp from individual documents?

No, those doc timestamps are still in the index. But without the local storage of (id, timestamp) lookup (regardless if in mem or on disk) we have no way to check this unless we query ES directly. On 100Mil+ scale this becomes a lot of queries (im talking about this step).

So using a common single timestamp as a reference point (e.g. start time of last successful sync) is a shortcut to decide whether or not to download the content of a file. With that we don't need to store (id, timestamp) locally and query the index for it.

artem-shelkovnikov commented 1 week ago

Got it.

We'll need to think more about this part though, as surprises can happen. I don't think any connector snapshots 3rd-party system (except maybe SQL ones with cursors that actually snapshot results until cursor is released). So that means that once you started sync, let's say at 01:00, your documents might get their timestamps after 01:00 cause they are changed in 3rd-party. So if we rely on timestamp of sync start always, then we're gonna download more than needed - not sure how much, probably not much but depends on 3rd-party system usage a lot.

There might be more to this, eventual consistency is fun.

seanstory commented 1 week ago

This sounds promising. Thanks, Jedr, for driving this.

One thought on cursors - we'll want to make sure this doesn't conflict with how Sharepoint Online already uses cursors. We don't want to break the purge phase cababilities for it, if its incremental syncs set non-timestamp cursors.

Could we not just fetch the timestamp by looking at the connector-sync-jobs (sorted by recency) and use the "created_at" timestamp?

if we rely on timestamp of sync start always, then we're gonna download more than needed - not sure how much, probably not much but depends on 3rd-party system usage a lot.

I think that's ok. Better to have the incremental syncs be slightly less than optimal, rather than risk data loss/gaps.

jedrazb commented 1 week ago

In the team sync we agreed on using disk-based lookup for self-managed connectors (managed in config.yml, same as with extraction service) as it's should be relatively easy to implement.