opensearch-project / OpenSearch

🔎 Open source distributed and RESTful search engine.
https://opensearch.org/docs/latest/opensearch/index/
Apache License 2.0
9.77k stars 1.82k forks source link

[Writable Warm] Design and implement Composite Directory and integrate with FileCache #12781

Open rayshrey opened 7 months ago

rayshrey commented 7 months ago

Is your feature request related to a problem? Please describe

Currently we don’t have support for any directory implementation which can interact with both local and remote repositories. We are proposing creating a new directory implementation where data is backed in a remote store and not all data needs to be stored locally. This directory will behave as a local directory when complete files are present in disk, but can fall back to the on-demand fetch(can be extended to block level or non block level fetch) from the remote store when data is is not present locally.

Describe the solution you'd like

Looking forward for review comments and discussions on this.

Related component

Storage:Remote

Describe alternatives you've considered

No response

Additional context

No response

mgodwan commented 7 months ago

Thanks @rayshrey for putting this proposal out. I like the overall idea of having this abstraction behind the Directory interface.

Few questions:

This directory will behave as a local directory when complete files are present in disk, but can fall back to the on-demand fetch(can be extended to block level or non block level fetch) from the remote store when data is is not present locally.

Calls over the network may have higher latency costs, and hence high thread wait time. Can this cause the write/search threads to be blocked more than we would like and will it make sense/be feasible to offload this to either async I/O or a different pool?

As soon as the file is uploaded to remote store, we add the file to our FileCache and change the state to CACHE indicating that file is present in cache

Does this signify that file can be removed from the local store if needed?

Our CompositeDirectory will have a function - afterSyncToRemote (called in RemoteStoreRefreshListener after the segments are uploaded) which will take care of writing the files to cache once the file is uploaded to remote store.

How would this cache look like? I assume this is an on disk cache. Could you elaborate on how will this look like on disk?

CACHE → we read it from the fileCache

Will this not require the knowledge of block being requested? Or does it ensure that files are always present on cache completely?

rayshrey commented 7 months ago

Thanks @mgodwan for the insightful comments.

Calls over the network may have higher latency costs, and hence high thread wait time. Can this cause the write/search threads to be blocked more than we would like and will it make sense/be feasible to offload this to either async I/O or a different pool?

Good point. I think writes will be async as uploads to the remote store will be taken care of by the remote directory itself. Will check the feasibility for reads as well.

Does this signify that file can be removed from the local store if needed?

Yes, once uploaded to the remote store, local files can be deleted.

How would this cache look like? I assume this is an on disk cache. Could you elaborate on how will this look like on disk?

There is already an existing FileCache in OpenSearch which is currently be used for SearchableSnapshots.We will be reusing the same. Currently it does not support for tiering the data at different levels and ttl logic as well. Will open a separate issue for FileCache changes that are needed. This issue mainly focuses on how the Composite directory will be structured and how the FileCache will fit into this structure.

Will this not require the knowledge of block being requested? Or does it ensure that files are always present on cache completely?

The approach I was thinking of was that both BLOCK and NON-BLOCK file types will be present in the Cache. For NON-BLOCK files, we simply return from the Cache whereas for BLOCK files we do what we did for the REMOTE FileState - return an instance OnDemandBlockIndexInput which handles all the abstractions for block level fetch(including caching the BLOCK files in FileCache as and when required)

The other approach we can take is to keep only BLOCK level files in the Cache and always return an instance of OnDemandBlockIndexInput.

The first approach sounds more reasonable to me as it gives us the flexibility of choosing between what we want to fetch according to our requirements - BLOCK or NON-BLOCK files. Your thoughts on this - @ankitkala @mgodwan ?

rayshrey commented 7 months ago

@andrross @sohami @neetikasinghal Can you please review this

andrross commented 7 months ago

[Triage - attendees 1 2 3 4 5 6] @rayshrey Thanks for filing. Looking forward to seeing progress.

sohami commented 7 months ago

@rayshrey Thanks for creating this issue. Couple of questions:

  1. Just to clarify this composite directory is currently going to be used only to represent warm indices or it will also represent a remote backed hot indices ?
  2. For hot indices the file state will be DISK only or it can be DISK and CACHE ? I think we still need to decide on sharing the FileCache for hot/warm data, so assuming it will be used for warm indices for now only, my understanding is for hot indices all the file state will be DISK ?
  3. Based on file state changes, wondering for warm data, when new file gets created, why cannot we move it directly to FileCache vs doing it only after upload to remote store ? I am trying to see if we can keep separation between state of hot vs warm data files.
  4. What is difference between FileState DISK and CACHE ? Both the file types will be stored in local disk only, so do we really need to have these 2 separate states or can we combine them into single state such as DISK. In the read path, depending on the index type i.e. hot/warm, the data files will be either served from FileCache or from local directory which directory should be able to decide in the base case.
  5. I think the tricky part is for non-dedicated search node setup when index is moving to warm, some data could be still in local (outside FileCache) and some in the FileCache. For that probably, directory again needs to first look into local directory and if not found then fallback to FileCache to provide that file (either may already be in FileCache or download it using block level semantics). Note: FileCache can still have both block vs full file, as for recent indices migrating to warm, we may want to keep their whole file in cache for sometime. Any thoughts on these ?
  6. Also it seems to me FileTracker then becomes important only to handle the case described above in 2. Other than that, if all files of hot will be served from local and warm using FileCache, then we don't really need file tracker right ?
mch2 commented 7 months ago

Thanks for writing this up. Some thoughts/questions.

  1. How would a user configure the store type on the local directory used inside of the composite dir? I would think even with warm we don't want to restrict the type for local writes?
  2. The RemoteStoreFileTrackerAdapter deviates a bit from the RFC where the composite dir only contains local & remote directory implementations. Can we not inject a remote directory here directly? I am also thinking we do not need the extra file tracking as @sohami called out because TransferManager can already handle if file should be fetched from local dir, cache or remote.
  3. To that last thought we could subclass RemoteSegmentStoreDirectory with a block based impl and inject that into CompositeDirectory instead of the adapter? This would build a component similar to RemoteSnapshotDirectory where we return OnDemandBlockIndexInput directly. I think with a little refactoring to the FileInfo metadata passed in we could even reuse OnDemandBlockSnapshotIndexInput as these flows are very similar to your initial OnDemandCompositeBlockIndexInput.
rayshrey commented 7 months ago

Thanks @sohami and @mch2 for your insights. Please find answers to your questions below.

Just to clarify this composite directory is currently going to be used only to represent warm indices or it will also represent a remote backed hot indices

Yes composite directory will be used for remote backed hot indices as well. Will add support for that incrementally in another PR once the base implementation is finalized in this one.

For hot indices the file state will be DISK only or it can be DISK and CACHE ? I think we still need to decide on sharing the FileCache for hot/warm data, so assuming it will be used for warm indices for now only, my understanding is for hot indices all the file state will be DISK

For hot data, we won't be caching it into FileCache, so all the data will be present locally for hot indices.

Based on file state changes, wondering for warm data, when new file gets created, why cannot we move it directly to FileCache vs doing it only after upload to remote store ? I am trying to see if we can keep separation between state of hot vs warm data files.

For data present locally we are not putting it in FileCache, we are simply fetching it directly from the localDirectory to keep things simple (as adding local files in FileCache and then fetching it from there wouldn't have any added benefits). For separation of hot/warm data files we will need to have some sort migration logic in the Directory itself once we start adding support for remote backed hot indices in composite Directory

What is difference between FileState DISK and CACHE ? Both the file types will be stored in local disk only, so do we really need to have these 2 separate states or can we combine them into single state such as DISK. In the read path, depending on the index type i.e. hot/warm, the data files will be either served from FileCache or from local directory which directory should be able to decide in the base case.

Have modified the logic to check from the local and remote directories whether they are present locally or in remote. Will add the updated the class diagrams and flows in some time. Till then this PR would help in understanding the updated design - https://github.com/opensearch-project/OpenSearch/pull/12782

I think the tricky part is for non-dedicated search node setup when index is moving to warm, some data could be still in local (outside FileCache) and some in the FileCache. For that probably, directory again needs to first look into local directory and if not found then fallback to FileCache to provide that file (either may already be in FileCache or download it using block level semantics). Note: FileCache can still have both block vs full file, as for recent indices migrating to warm, we may want to keep their whole file in cache for sometime. Any thoughts on these ?

As of now we are not putting the entire file in Cache, so FileCache will only have block files. For hot to warm migrations, we will delete all the local files once uploaded to Remote. For warm to hot, we will download all the files present in remote to local and clear all the files in FileCache. Will raise a separate issue for handling the migration where we can discuss this in more details.

Also it seems to me FileTracker then becomes important only to handle the case described above in 2. Other than that, if all files of hot will be served from local and warm using FileCache, then we don't really need file tracker right ?

Yes it doesn't really seem necessary. As stated earlier, have updated the design and it can be checked in this PR

How would a user configure the store type on the local directory used inside of the composite dir? I would think even with warm we don't want to restrict the type for local writes?

Yes this was a con in the previous approach and hence we have decided to move away from that to a new setting which would indicate whether full data is cached locally (hot index) or partial data is cached locally (warm index). This will allow users to have their own store type as local directory.

The RemoteStoreFileTrackerAdapter deviates a bit from the RFC where the composite dir only contains local & remote directory implementations. Can we not inject a remote directory here directly? I am also thinking we do not need the extra file tracking as @sohami called out because TransferManager can already handle if file should be fetched from local dir, cache or remote.

Have updated the design to remove the FileTracker and RemoteStoreFileTrackerAdapter overheads and am just injecting a remote directory now. Can refer the PR until I add the updated design in the description.

To that last thought we could subclass RemoteSegmentStoreDirectory with a block based impl and inject that into CompositeDirectory instead of the adapter? This would build a component similar to RemoteSnapshotDirectory where we return OnDemandBlockIndexInput directly. I think with a little refactoring to the FileInfo metadata passed in we could even reuse OnDemandBlockSnapshotIndexInput as these flows are very similar to your initial OnDemandCompositeBlockIndexInput.

The problem with having an implementation similar to RemoteSnapshotDirectory is that the TransferManager uses BlobContainer for fetching the file. To get the BlobContainer of RemoteDirectory we need to expose a method in RemoteSegmentStoreDirectory to get the BlobContainer of its remoteDataDirectory which does not seem right as it will leak the abstractions of RemoteSegmentStoreDirectory. Hence we have exposed a new method in RemoteSegmentStoreDirectory which fetches the required file from remote to local which is called in the fetchBlock method of OnDemandCompositeBlockIndexInput.

sohami commented 7 months ago

Based on file state changes, wondering for warm data, when new file gets created, why cannot we move it directly to FileCache vs doing it only after upload to remote store ? I am trying to see if we can keep separation between state of hot vs warm data files.

For data present locally we are not putting it in FileCache, we are simply fetching it directly from the localDirectory to keep things simple (as adding local files in FileCache and then fetching it from there wouldn't have any added benefits).

The benefit is that we are keeping the separation of hot/warm data which are referenced by local directory vs FileCache. That would mean all the local space occupied by a warm index will always be accounted via FileCache and that can be used by any accounting/disk monitoring mechanism for warm tier. Otherwise it will be difficult to explain when a index data is managed with or without FileCache

neetikasinghal commented 7 months ago

@rayshrey thanks for posting this. Some thoughts/questions on the migration flows:

rayshrey commented 4 months ago

Most of the design discussions on this shifted to the POC implementation (which later turned to a concrete PR after the reviews). Listing down the current design which was implemented along with some other basic design decisions which were taken.


WARM Index Setting

Introduced a new index setting index.store.data_locality which can be either:

Example

PUT my-index-warm
{
  "settings": {
    "index": {
      "number_of_shards": 1,
      "number_of_replicas": 0,
      "store" : {
          "data_locality" : "partial"
      }
    }
  }
}

Class Diagram

image

Composite Directory Read and Write Flows

Write (createOutput)

File Uploaded to Remote(afterSyncToRemote)

Whenever a file is uploaded to remote, we will already have a full file entry for it in the file cache (from the above write flow) So we will decrease the ref count ont that entry by one so that it can be now evicted from the FileCache (since we already have it in Remote, it is safe to be removed from local considering it is not being actively referenced elsewhere)

Read (openInput)

Changes in FileCache Initialization

FileCache was initially introduced only for Searchable Snapshot use case and was initialized only on nodes which were configured for the Search role. Since we will be using FileCache as well for Writable Warm, we will be initializing FileCache based on our Feature Flag currently and reserve 80% of the node capacity for FileCache.

TODO - Explore if we can have different node roles based on which we will be initializing the FileCache, such as WARM role (similar to the SEARCH role used earlier for Searchable Snapshots)

RemoteDirectory Changes

Currently RemoteDirectory only supports reading a full file via the openInput method. Our use-case is such that we need to read a certain part/block of the file only. So we overload the IOContext parameter passed in the openInput function to add details of the part to read(such as offset position and length etc) and in the openInput function we check the IOContext to know whether it is a block read request or a full file read request.

class BlockIOContext extends IOContext {
    long blockStart;
    long blockSize;

    BlockIOContext(IOContext ctx, long blockStart, long blockSize) {
        super(ctx);
        this.blockStart = blockStart;
        this.blockSize = blockSize;
    }
}

public IndexInput openInput(String name, IOContext context) {
    if (context instance of BlockIOContext) {
        // extract offset and length from BlockIOContext
        // and fetch only that specific part from remote
    } else {
        // fetching full file from remote
    }
}

Changes in TransferManager

Currently TransferManager is configured to be able to read only from a BlobContainer considering in it’s original use case(Searchable Snapshot) the BlobContainer was already exposed. But for Composite Directory, BlobContainer is abstracted out and we will need to be able to read directly from Remote Directory as well.

Hence we need to change the BlobContainer to a more generic StreamReader below.

@FunctionalInterface
public interface StreamReader {
    InputStream read(String name, long position, long length) throws IOException;
}

This is how we will initialize TransferManager for Searchable Snapshot and for Composite Directory

// Searchable Snapshot
TransferManager transferManager = new TransferManager(blobContainer::readBlob, fileCache);

// Composite Directory
transferManager = 
new TransferManager(
    (name, position, length) -> new InputStreamIndexInput(
        remoteDirectory.openInput(name, new BlockIOContext(IOContext.DEFAULT, position, length)),
        length
   ),
    fileCache
);