apache / dolphinscheduler

Apache DolphinScheduler is the modern data orchestration platform. Agile to create high performance workflow with low-code
https://dolphinscheduler.apache.org/
Apache License 2.0
12.4k stars 4.49k forks source link

[DSIP-45] Polish the Storage SPI #16140

Closed ruanwenjun closed 2 weeks ago

ruanwenjun commented 2 weeks ago

Search before asking

Motivation

Right now, the storage api is very complex, there are a lot of issue related to the storage, e.g. CVE, file path is incorrect. Most of these are due to the usage of storage path. We will use absolute path and relative path with storage, but the api does't announce this.

In additional, the storage interface is complex, it rely on some business logic, e.g. tenant, default path, once we want to import a new storage, it's difficult.

This DSIP is aim to refactor the storage api. make it more easy to maintain.

Design Detail

The new storage spi will focus on filesystem operation.

public interface StorageOperator {

    String FILE_FOLDER_NAME = "resources";
    String UDF_FOLDER_NAME = "udfs";

    String RESOURCE_UPLOAD_PATH = PropertyUtils.getString(Constants.RESOURCE_UPLOAD_PATH, "/dolphinscheduler");

    ResourceMetadata getResourceMetaData(String resourceAbsolutePath);

    /**
     * Get the absolute path of base directory.
     *
     * @return the base directory. e.g. file:///tmp/dolphinscheduler/, /tmp/dolphinscheduler/
     */
    String getStorageBaseDirectory();

    /**
     * Get the absolute path of directory which will be used by the given tenant. the tenant directory is under the base directory.
     *
     * @param tenantCode the tenant code, cannot be empty
     * @return the tenant directory. e.g. file:///tmp/dolphinscheduler/default/
     */
    String getStorageBaseDirectory(String tenantCode);

    /**
     * Get the absolute path of directory which will be used by the given tenant and resource type. the resource directory is under the tenant directory.
     * <p> If the resource type is FILE, will be 'file:///tmp/dolphinscheduler/default/resources/'.
     * <p> If the resource type is UDF, will be 'is file:///tmp/dolphinscheduler/default/udfs/'.
     * <p> If the resource type is ALL, will be 'is file:///tmp/dolphinscheduler/default/'.
     *
     * @param tenantCode   the tenant code, cannot be empty
     * @param resourceType the resource type, cannot be null
     * @return the resource directory. e.g. file:///tmp/dolphinscheduler/default/resources/
     */
    String getStorageBaseDirectory(String tenantCode, ResourceType resourceType);

    /**
     * Get the absolute path of the file in the storage. the file will under the file resource directory.
     *
     * @param tenantCode the tenant code, cannot be empty
     * @param fileName   the file name, cannot be empty
     * @return the file absolute path. e.g. file:///tmp/dolphinscheduler/default/resources/test.sh
     */
    String getStorageFileAbsolutePath(String tenantCode, String fileName);

    /**
     * Create a directory if the directory is already exists will throw exception(Dependent on the storage implementation).
     * <p> If the directory is not exists, will create the directory.
     * <p> If the parent directory is not exists, will create the parent directory.
     * <p> If the directory is already exists, will throw {@link FileAlreadyExistsException}.
     *
     * @param directoryAbsolutePath the directory absolute path
     */
    void createStorageDir(String directoryAbsolutePath);

    /**
     * Check if the resource exists.
     *
     * @param resourceAbsolutePath the resource absolute path
     * @return true if the resource exists, otherwise false
     */
    boolean exists(String resourceAbsolutePath);

    /**
     * Delete the resource, if the resourceAbsolutePath is not exists, will do nothing.
     *
     * @param resourceAbsolutePath the resource absolute path
     * @param recursive            whether to delete all the sub file/directory under the given resource
     */
    void delete(String resourceAbsolutePath, boolean recursive);

    /**
     * Copy the resource from the source path to the destination path.
     *
     * @param srcAbsolutePath the source path
     * @param dstAbsolutePath the destination path
     * @param deleteSource    whether to delete the source path after copying
     * @param overwrite       whether to overwrite the destination path if it exists
     */
    void copy(String srcAbsolutePath, String dstAbsolutePath, boolean deleteSource, boolean overwrite);

    /**
     * Move the resource from the source path to the destination path.
     *
     * @param srcLocalFileAbsolutePath the source local file
     * @param dstAbsolutePath          the destination path
     * @param deleteSource             whether to delete the source path after moving
     * @param overwrite                whether to overwrite the destination path if it exists
     * @return true if the resource is moved successfully, otherwise false
     * @throws IOException
     */
    void upload(String srcLocalFileAbsolutePath, String dstAbsolutePath, boolean deleteSource, boolean overwrite);

    /**
     * Download the resource from the source path to the destination path.
     *
     * @param srcFileAbsolutePath the source path
     * @param dstAbsoluteFile     the destination file
     * @param overwrite           whether to overwrite the destination file if it exists
     * @throws IOException
     */
    void download(String srcFileAbsolutePath, String dstAbsoluteFile, boolean overwrite) throws IOException;

    /**
     * Fetch the content of the file.
     *
     * @param fileAbsolutePath the file path
     * @param skipLineNums     the number of lines to skip
     * @param limit            the number of lines to read
     * @return the content of the file
     */
    List<String> fetchFileContent(String fileAbsolutePath, int skipLineNums, int limit);

    /**
     * Return the {@link StorageEntity} under the given path.
     * <p>If the path is a file, return the file status.
     * <p>If the path is a directory, return the file/directory under the directory.
     * <p>If the path is not exist, will return empty.
     *
     * @param resourceAbsolutePath the resource absolute path, cannot be empty
     */
    List<StorageEntity> listStorageEntity(String resourceAbsolutePath);

    /**
     * Return the {@link StorageEntity} which is file under the given path
     *
     * @param resourceAbsolutePath the resource absolute path, cannot be empty
     */
    List<StorageEntity> listFileStorageEntityRecursively(String resourceAbsolutePath);

    /**
     * Return the {@link StorageEntity} under the current directory
     *
     * @param resourceAbsolutePath the resource absolute path, cannot be empty
     */
    StorageEntity getStorageEntity(String resourceAbsolutePath);

}

Compatibility, Deprecation, and Migration Plan

Compatibility with current version

Test Plan

Add IT for HDFS(Local mode) / S3

Code of Conduct