elastic / connectors

Source code for all Elastic connectors, developed by the Search team at Elastic, and home of our Python connector development framework
https://www.elastic.co/guide/en/enterprise-search/master/index.html
Other
65 stars 119 forks source link

Return possibility for connectors to have optimised ES ingestion #1917

Closed artem-shelkovnikov closed 1 week ago

artem-shelkovnikov commented 8 months ago

Problem Description

We've recently merged a PR to Remove timestamp optimization for full syncs. This PR slows down some connectors quite a lot.

About optimisation:

Connector service before the PR was merged was not ingesting documents into ES, that did not change. A bit of pseudocode to demonstrate:

existing_records = elasticsearch.search(fields:['_id', '_timestamp'])  # get back list of objects like { "_id": 123, "_timestamp": "2023-01-01 12:00:00Z"}

for document in connector.get_docs():
  existing_document = existing_records.find(r => r["_id"] == document["_id"])  # see if document existed before this sync
  if existing_document["_timestamp"] < document["_timestamp"]   # document changed on remote
    sink.ingest(document)

This optimisation was good for the following reasons:

  1. Bandwidth economy - we did not send docs that need no update, so we need less bandwidth making us more efficient/faster potentially
  2. Elasticsearch load reduction - we additionally reduced load on Elasticsearch (however, it's also doing optimisations on non-changed documents)

This resulted in some subsequent syncs taking up to 10 times less time to finish the sync - or even more. Now we don't have it.

Why was optimisation removed?

A couple issues:

  1. Some content sources do not correctly report timestamps for changes - for example, if permissions change for the object, its timestamp did not change, making it so that outdated permissions were saved for the object
  2. Some content sources have different criteria for the object being changed, not just timestamp. We need to be able to make this optimisation work not only on timestamp fields, but on arbitrary combination of fields somehow
  3. In case of using ingest pipelines, failures on the side of ingest pipelines were incorrectly handled, and failing ingest pipeline on the document could cause this document to never correctly be re-ingested (as timestamp field is correctly set for object even if ingest pipeline failed).

Proposed Solution

There are multiple ways to do so, I'll call out a few:

Feature flags

We can make all connectors work with this optimisation together with a feature flag: if connector definition states that "supports_timestamp_optimisation", then this connector can check its feature flag if it's enabled (connector.features.timestamp.optimisation == True) and execute it if needed.

New sync job type

As suggested by @timgrein, we can separate this logic into "shallow sync" to allow it for each connector and give control of it to the customer

Re-thinking the hierarchy of connectors and allow optimisation to be pluggable

We can change the framework to make such traits be a "plugin" rather than "metadata" of the connector.

For example (really really raw thoughts in my head):

class SomeCoolConnector(BaseConnector):
# ...

  def plugins(self):
    optimisation = IngestionOptimisationPlugin(fields=["_timestamp", "user.permissions", "group.permissions"])
    advanced_filtering_rules = AdvancedFilteringRulesPlugin(validation=CoolConnectorFilteringRulesValidation(self), execution=CoolConnectorFilteringRulesExecution(self))

    return (optimisation, advanced_filtering_rules) 

This idea is very raw and I'm still thinking about it, but I feel that extension via composition - building connector from small blocks - rather than inheritance will bring us forward

Alternatives

Do nothing :)

artem-shelkovnikov commented 8 months ago

Additional point: we need to check how to work around the ingest pipeline failures and mark documents as "dirty".

@seanstory told me about problems when connector framework is used on a small cluster with ELSER and ingest pipeline is starving, we ingest documents but they are actually "incomplete".

We need to find out what to do with these "incomplete" documents and how we can effectively re-ingest them again.

artem-shelkovnikov commented 1 week ago

Stale