apache / incubator-devlake

Apache DevLake is an open-source dev data platform to ingest, analyze, and visualize the fragmented data from DevOps tools, extracting insights for engineering excellence, developer experience, and community growth.
https://devlake.apache.org/
Apache License 2.0
2.6k stars 525 forks source link

[Bug][Pipeline] The pipeline gets stuck if the tool has not space let in the disk #8098

Open msarzijr opened 1 month ago

msarzijr commented 1 month ago

Search before asking

What happened

When we have too many pipelines running at the same time, the git extractor gets full, and the disk gets full. The pipeline gets stuck and does not even respond to the stop command.

image

What do you expect to happen

I believe the cancel pipeline command would stop the pipeline and clean the /tmp folder to be able to reschedule the pipelines to different times.

How to reproduce

Run pipelines in parallel that collets a number of git repos that are bigger than the disk space collects.

Anything else

Thanks for reading my post.

Version

v1.0.1-beta9@e061ef2

Are you willing to submit PR?

Code of Conduct

dosubot[bot] commented 1 month ago

Hey there, @msarzijr! I'm here to help you with any bugs, questions, or contribution guidance you might need. Let's tackle this together!

I found a similar closed issue: [Feature][framework] Throttling total number of pipelines running in parallel, which was resolved by a pull request that implemented running pipelines in an independent process [1].

To address the issue of the pipeline getting stuck when the disk space is full and ensuring the cancel pipeline command stops the pipeline and cleans the /tmp folder, you can implement the following steps:

  1. Monitor Disk Space: Implement a function to monitor disk space and pause the pipeline if the disk space is below a certain threshold.
  2. Clean /tmp Folder on Cancel: Modify the CancelPipeline function to clean the /tmp folder when a pipeline is canceled.

Here is an example implementation:

Monitor Disk Space

Add a function to check disk space and pause the pipeline if necessary:

import (
    "syscall"
    "log"
)

func checkDiskSpace(path string, threshold uint64) bool {
    var stat syscall.Statfs_t
    err := syscall.Statfs(path, &stat)
    if err != nil {
        log.Printf("Failed to get disk space: %v", err)
        return false
    }
    // Available blocks * size per block = available space in bytes
    available := stat.Bavail * uint64(stat.Bsize)
    return available > threshold
}

func monitorDiskSpace() {
    const threshold = 10 * 1024 * 1024 * 1024 // 10 GB
    for {
        if !checkDiskSpace("/", threshold) {
            log.Println("Disk space is below threshold, pausing pipeline")
            // Pause the pipeline
            // Implement the logic to pause the pipeline
        }
        time.Sleep(1 * time.Minute) // Check every minute
    }
}

Clean /tmp Folder on Cancel

Modify the CancelPipeline function to clean the /tmp folder:

import (
    "os"
    "path/filepath"
)

func cleanTmpFolder() error {
    tmpDir := "/tmp"
    err := os.RemoveAll(tmpDir)
    if err != nil {
        return err
    }
    return os.Mkdir(tmpDir, 0755)
}

func CancelPipeline(pipelineId uint64) errors.Error {
    // prevent RunPipelineInQueue from consuming pending pipelines
    pipeline := &models.Pipeline{}
    err := db.First(pipeline, dal.Where("id = ?", pipelineId))
    if err != nil {
        return errors.BadInput.New("pipeline not found")
    }
    if pipeline.Status == models.TASK_CREATED || pipeline.Status == models.TASK_RERUN {
        pipeline.Status = models.TASK_CANCELLED
        err = db.Update(pipeline)
        if err != nil {
            return errors.Default.Wrap(err, "failed to update pipeline")
        }
        // now, with RunPipelineInQueue being block and target pipeline got updated
        // we should update the related tasks as well
        err = db.UpdateColumn(
            &models.Task{},
            "status", models.TASK_CANCELLED,
            dal.Where("pipeline_id = ?", pipelineId),
        )
        if err != nil {
            return errors.Default.Wrap(err, "failed to update pipeline tasks")
        }
        // Clean the /tmp folder
        if err := cleanTmpFolder(); err != nil {
            return errors.Default.Wrap(err, "failed to clean /tmp folder")
        }
        // the target pipeline is pending, no running, no need to perform the actual cancel operation
        return nil
    }
    pendingTasks, count, err := GetTasks(&TaskQuery{PipelineId: pipelineId, Pending: 1, Pagination: Pagination{PageSize: -1}})
    if err != nil {
        return errors.Convert(err)
    }
    if count == 0 {
        return nil
    }
    for _, pendingTask := range pendingTasks {
        _ = CancelTask(pendingTask.ID)
    }
    // Clean the /tmp folder
    if err := cleanTmpFolder(); err != nil {
        return errors.Default.Wrap(err, "failed to clean /tmp folder")
    }
    return errors.Convert(err)
}

Run Disk Space Monitor

Start the disk space monitor in the pipelineServiceInit function:

func pipelineServiceInit() {
    // initialize plugin
    plugin.InitPlugins(basicRes)

    // notification
    var notificationEndpoint = cfg.GetString("NOTIFICATION_ENDPOINT")
    var notificationSecret = cfg.GetString("NOTIFICATION_SECRET")
    if strings.TrimSpace(notificationEndpoint) != "" {
        defaultNotificationService = NewDefaultPipelineNotificationService(notificationEndpoint, notificationSecret)
    }

    // standalone mode: reset pipeline status
    if cfg.GetBool("RESUME_PIPELINES") {
        markInterruptedPipelineAs(models.TASK_RESUME)
    } else {
        markInterruptedPipelineAs(models.TASK_FAILED)
    }

    // load cronjobs for blueprints
    errors.Must(ReloadBlueprints())

    var pipelineMaxParallel = cfg.GetInt64("PIPELINE_MAX_PARALLEL")
    if pipelineMaxParallel < 0 {
        panic(errors.BadInput.New(`PIPELINE_MAX_PARALLEL should be a positive integer`))
    }
    if pipelineMaxParallel == 0 {
        globalPipelineLog.Warn(nil, `pipelineMaxParallel=0 means pipeline will be run No Limit`)
        pipelineMaxParallel = 10000
    }
    // run pipeline with independent goroutine
    go RunPipelineInQueue(pipelineMaxParallel)

    // Start disk space monitor
    go monitorDiskSpace()
}

This implementation ensures that the pipeline is paused when disk space is low and the /tmp folder is cleaned when a pipeline is canceled [2].

To continue talking to Dosu, mention @dosu.

msarzijr commented 1 month ago

Hi @dosu, Thanks for the points. They are useful, but I believe I should have them embedded in the tool as a user. If I click to stop the pipeline, it needs to stop and clean the disk by itself.

It will help the user to keep going without needing technical intervention, as now it just makes the pipelines stucked.

dosubot[bot] commented 1 month ago

Yes, the tool can be modified to automatically stop the pipeline and clean the disk when the user clicks to stop the pipeline. The CancelPipeline function already handles stopping the pipeline. You can extend this function to include disk cleanup.

Here is an example of how you might modify the CancelPipeline function to include disk cleanup:

func CancelPipeline(pipelineId uint64) errors.Error {
    // prevent RunPipelineInQueue from consuming pending pipelines
    pipeline := &models.Pipeline{}
    err := db.First(pipeline, dal.Where("id = ?", pipelineId))
    if err != nil {
        return errors.BadInput.New("pipeline not found")
    }
    if pipeline.Status == models.TASK_CREATED || pipeline.Status == models.TASK_RERUN {
        pipeline.Status = models.TASK_CANCELLED
        err = db.Update(pipeline)
        if err != nil {
            return errors.Default.Wrap(err, "failed to update pipeline")
        }
        // now, with RunPipelineInQueue being block and target pipeline got updated
        // we should update the related tasks as well
        err = db.UpdateColumn(
            &models.Task{},
            "status", models.TASK_CANCELLED,
            dal.Where("pipeline_id = ?", pipelineId),
        )
        if err != nil {
            return errors.Default.Wrap(err, "failed to update pipeline tasks")
        }
        // the target pipeline is pending, no running, no need to perform the actual cancel operation
        return nil
    }
    pendingTasks, count, err := GetTasks(&TaskQuery{PipelineId: pipelineId, Pending: 1, Pagination: Pagination{PageSize: -1}})
    if err != nil {
        return errors.Convert(err)
    }
    if count == 0 {
        return nil
    }
    for _, pendingTask := range pendingTasks {
        _ = CancelTask(pendingTask.ID)
    }

    // Clean up disk
    logsPath, err := getPipelineLogsPath(pipeline)
    if err == nil {
        err = os.RemoveAll(logsPath)
        if err != nil {
            return errors.Default.Wrap(err, "failed to clean up disk")
        }
    } else {
        return errors.Default.Wrap(err, "failed to get logs path for cleanup")
    }

    return errors.Convert(err)
}

Additionally, you can modify the UI to handle the cancel operation and trigger the disk cleanup. In the PipelineInfo component, the handleCancel function can be used to call the API that cancels the pipeline and performs the cleanup:

const handleCancel = async () => {
    const [success] = await operator(() => API.pipeline.remove(id), {
      setOperating,
    });

    if (success) {
      // setVersion((v) => v + 1);
    }
};

This modification ensures that when a pipeline is canceled, the associated disk space is also cleaned up [1][2].

To continue talking to Dosu, mention @dosu.