opensearch-project / opensearch-spark

Spark Accelerator framework ; It enables secondary indices to remote data stores.
Apache License 2.0
22 stars 33 forks source link

Remove checkpoint folder when vacuuming index #621

Closed dai-chen closed 2 months ago

dai-chen commented 2 months ago

Description

This PR adds functionality to delete the checkpoint folder associated with a Flint index during the vacuuming process. The new FlintSparkCheckpoint class wraps Spark's CheckpointFileManager and abstracts checkpoint management API.

Testing

Prepare for vacuum test:

# Create a materialized view and cancel refreshing after a while
CREATE MATERIALIZED VIEW glue.default.vacuum_checkpoint_test_2
AS
SELECT clientip
FROM glue.default.http_logs
WITH (
  auto_refresh = true,
  checkpoint_location = 's3://checkpoint/vacuum-checkpoint-test-2');

# Drop materialized view and check current status
# Index data
GET _cat/indices?v
green  open   flint_glue_default_vacuum_checkpoint_test_2   5   1   2982743   0    258.6mb   121.2mb

# Index metadata log entry
GET .query_execution_request_glue/_search
      {
        "_index": ".query_execution_request_glue",
        "_id": "ZmxpbnRfZ2x1ZV9kZWZhdWx0X3ZhY3V1bV9jaGVja3BvaW50X3Rlc3RfMg==",
        "_score": 1,
        "_source": {
          "version": "1.0",
          "latestId": "ZmxpbnRfZ2x1ZV9kZWZhdWx0X3ZhY3V1bV9jaGVja3BvaW50X3Rlc3RfMg==",
          "type": "flintindexstate",
          "state": "deleted",
          "applicationId": "XXXXXX",
          "jobId": "YYYYYY",
          "dataSourceName": "glue",
          "jobStartTime": 1725474075924,
          "lastUpdateTime": 1725474672053,
          "error": ""
        }
      }

$ aws s3 ls s3://checkpoint | grep vacuum-checkpoint-test
                           PRE vacuum-checkpoint-test-2/
2024-09-04 11:21:13          0 vacuum-checkpoint-test-2_$folder$

Verify checkpoint folder removed after vacuum:

VACUUM MATERIALIZED VIEW glue.default.vacuum_checkpoint_test_2;

$ aws s3 ls s3://checkpoint | grep vacuum-checkpoint-test
(empty)

# Spark log
FlintSpark: Starting index operation [Vacuum Flint index flint_glue_default_vacuum_checkpoint_test_2] with forceInit=false
...
FlintOpenSearchClient: Deleting Flint index flint_glue_default_vacuum_checkpoint_test_2
...
FlintSparkCheckpoint: Checkpoint directory s3://checkpoint/vacuum-checkpoint-test-2 deleted
FlintOpenSearchMetadataLog: Purging log entry with id ZmxpbnRfZ2x1ZV9kZWZhdWx0X3ZhY3V1bV9jaGVja3BvaW50X3Rlc3RfMg==
FlintOpenSearchMetadataLog: Purged log entry with result DELETED
FlintSpark: Index operation [Vacuum Flint index flint_glue_default_vacuum_checkpoint_test_2] complete

Issues Resolved

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. For more information on following Developer Certificate of Origin and signing off your commits, please check here.

dai-chen commented 2 months ago

@seankao-az @noCharger I'm not sure if and where we want to backport. Feel free to add backport label as needed.

noCharger commented 2 months ago

@dai-chen Any permission change we should update on public doc?

dai-chen commented 2 months ago

@dai-chen Any permission change we should update on public doc?

Is s3:DeleteObject" permission sufficient? https://docs.aws.amazon.com/opensearch-service/latest/developerguide/direct-query-s3-creating.html

         "Sid":"ReadAndWriteActionsForS3CheckpointBucket",
         "Effect":"Allow",
         "Action":[
            "s3:ListMultipartUploadParts",
            "s3:DeleteObject",
            "s3:GetObject",
            "s3:PutObject",
            "s3:GetBucketLocation",
            "s3:ListBucket"
         ],