facebookincubator / velox

A composable and fully extensible C++ execution engine library for data management systems.
https://velox-lib.io/
Apache License 2.0
3.5k stars 1.15k forks source link

`Task::next()` Deadlocks with Pipeline Dependencies in Single-threaded Execution #11442

Open yjshen opened 4 hours ago

yjshen commented 4 hours ago

Bug description

Current Behavior

In Velox's single-threaded execution mode, Task::next() uses folly::collectAll to wait for all blocked drivers, even when some drivers are waiting for others in the same task.

Specific example with hash join:

  1. Build side driver temporarily blocks waiting for input from a custom operator.
  2. Probe side driver blocks waiting for the build side to complete
  3. Task::next() waits for both drivers via collectAll
  4. Execution deadlocks because probe can never unblock until build completes

Expected Behavior

The implementation should:

  1. Allow drivers to unblock independently when their blocking conditions are met
  2. Not require all blocked drivers to become unblocked simultaneously
  3. Respect pipeline data dependencies

Technical Details

The bug occurs in Task::next():

// In Task::next()
if (runnableDrivers == 0) {
  if (blockedDrivers > 0) {
    if (!future) {
      VELOX_FAIL(
          "Cannot make progress as all remaining drivers are blocked and user are not expected to wait.");
    } else {
      std::vector<ContinueFuture> notReadyFutures;
      for (auto& continueFuture : futures) {
        if (!continueFuture.isReady()) {
          notReadyFutures.emplace_back(std::move(continueFuture));
        }
      }
      *future = folly::collectAll(std::move(notReadyFutures)).unit();
    }
  }
  return nullptr;
}

Proposed Solution

Replace collectAll with collectAny to allow drivers to make progress independently

System information

I'm using an old version of Velox, but I checked the code for `Task::next(), and it is unchanged.

Velox System Info v0.0.2 Commit: 5d315fbf05d56370ace659cd6c000a1ca15d98f6 CMake Version: 3.28.3 System: Linux-6.8.0-1017-gcp Arch: x86_64 C++ Compiler: /usr/bin/c++ C++ Compiler Version: 11.4.0 C Compiler: /usr/bin/cc C Compiler Version: 11.4.0 CMake Prefix Path: /usr/local;/usr;/;/usr/local/lib/python3.10/dist-packages/cmake/data;/usr/local;/usr/X11R6;/usr/pkg;/opt

Relevant logs

No response

Yuhta commented 3 hours ago

Probably not a problem with collectAll. What is blocking build side to finish? It seems just blocked normally on the custom operator, not a deadlock.

yjshen commented 2 hours ago

In our scenario, we employ Gluten + Velox with custom operators preceding the join build; the caller of Task::next() begins to wait upon encountering a block in the build pipeline for the first time, yet it requires both the build and probe pipelines to be kNotBlocked in order to advance.

class CustomOperator : public Operator {
 protected:
  // States for async input processing
  bool noMoreInput_{false};
  mutable std::mutex mutex_;
  folly::Future<folly::Unit> inputProcessingTask_;  // Key field for blocking
  folly::Executor* executor_{nullptr};

 public:
  // Called when operator needs to process input asynchronously
  void startAsyncInputProcessing(RowVectorPtr input) {
    // Create a promise/future pair to track async processing
    folly::Promise<folly::Unit> promise;
    inputProcessingTask_ = promise.getFuture();  // Future becomes valid here

    // Schedule async task
    auto task = [this, input = std::move(input), promise = std::move(promise)]() {
        // Process input asynchronously...

        // When done, fulfill the promise
        promise.setValue();
    };
    executor_->add(std::move(task));
  }

  // Check if operator is blocked
  BlockingReason isBlockedDefault(ContinueFuture* future) {
    std::lock_guard<std::mutex> lock(mutex_);
    if (inputProcessingTask_.valid()) {
      // We're blocked waiting for async processing
      if (future) {
        *future = std::move(inputProcessingTask_);
      }
      return BlockingReason::kWaitForConnector;
    }
    return BlockingReason::kNotBlocked;
  }

  // Add new input
  void addInputDefault(RowVectorPtr input) {
    std::lock_guard<std::mutex> lock(mutex_);
    if (inputProcessingTask_.valid()) {
      VELOX_FAIL("Cannot add input while processing previous input");
    }
    startAsyncInputProcessing(std::move(input));
  }
};
Yuhta commented 2 hours ago

Probe is waiting on build, and build is waiting on inputProcessingTask_. The whole task cannot proceed until inputProcessingTask_ is fulfilled, which is correct.

yjshen commented 2 hours ago

Yes, I think the issue is that when inputProcessingTask_ is fulfilled later, the whole task still cannot proceed because the caller (WholeStageResultIterator) already waits on the future from collectAll, which won't be notified until the probe side is also runnable. Do I miss something crucial here?

Yuhta commented 1 hour ago

I see, we should only wait on futures for external blocking, and probe is an internal blocking and should not be part of the list.

@xiaoxmeng Any idea how we can differentiate external blocking vs internal blocking futures in this case?