prestodb / presto

The official home of the Presto distributed SQL query engine for big data
http://prestodb.io
Apache License 2.0
16.04k stars 5.37k forks source link

Introduce NonBlockingWork<T> #9470

Closed sopel39 closed 6 years ago

sopel39 commented 6 years ago

Extends on the concept of https://github.com/prestodb/presto/issues/8697 and Work. One of the goals is to be able to use storage backed lazy pages outside of SFP in long term.

https://github.com/prestodb/presto/issues/8697 idea was extending Iterator so that it could be used in a non-blocking way. The observation was that various Presto components have pattern of building pipeline from smaller processing components (e.g: PageProcessor, MergeHashSort, new Distributed merge sort, etc). However each component implements the concept a little bit different. It would be great if there is a abstraction over such pipeline that allows for:

Therefore I propose to introduce: NonBlockingWork that would have interface similar to:

NonBlockingWork<T> extends Work<T> {
  boolean isBlocked();
  Future<Void> getBlockedFuture();
  boolean process();
  boolean isFinished();
  T getResult();
}

Such NonBlockingWork work instance would be used in following steps:

  1. wait until work instance is not blocked (isBlocked/getBlockedFuture methods)
  2. do work using process method until it returns true.
  3. check if work instance is finished (isFinished method). If it's not finished then obtain result using getResult method.

Additionally, there would be utility class WorkUtils that would simplify creating of Work/NonBlockingWork pipelines. It would contain methods/interfaces like:

Iterator<T> <T> WorkUtils#toBlockingIterator(NonBlockingWork<T> work);

/*
 * Transforms `work` instance that returns T to `NonBlockingWork` that returns R.
 * `transformation` mapping function would be called when new element of type
 * T is pulled from `work`. `transformation` might either return transformed element
 * of type R or `Optional.absent` if it yields. Yielded `transformation` would be
 * called again with the same argument.
 */
NonBlockingWork<R> <T, R> WorkUtils#transform(
    NonBlockingWork<T> work,
    Function<T, Optional<R>> transformation);

Then there would be more complex transformation functions that would support blocking future:

// "case" class with possible results
class Result<T> {
 static Result<T> needsMoreData() {...}
 static Result<T> blocked(Future<Void> blocked) {...}
 static Result<T> yield() {...}
 static Result<T> ofElement(T result) {...}
}

/**
 * Similar to previous `WorkUtils#transform` method, but now `transformation`
 * can return more states than just `result` or `yielded`. `transformation` might return:
 * - needsMoreData - it will be called again with next element pulled from `work`
 * - yield - it will be called again with the same element from `work`
 * - blocked - it will be called again with `Optional.absent` when future is done
 * - ofElement - self explanatory. It will be called again with next element pulled
 *               from `work`
 */
NonBlockingWork<R> <T, R> WorkUtils#transform(
   NonBlockingWork<T> work,
   Function<Optional<T>, Result<R>> transformation);

With such model it should be really simple to build complex pipelines like:

Great benefit is that in the long term we could use storage based lazy pages outside of SFP. Pipeline elements decide when they want to delegate call to upstream work. This way we can use lazy pages outside of SFP because reader won't be asked for next page until previous lazy page is consumed. Additionally state transitions are hidden from pipeline elements by utility classes. Pipeline elements are pull based only which also simplifies computation model.

Initially we could start using such model within single operator (e.g: ScanFilterProject, distributed sort). Later on we might merge some operators together (e.g: SFP, TopN, MarkDistinct, DynamicFilter, JoinProbeSide, MarkDistinct) under some "umbrella wholestage" operator. I would call it a kind of mild "wholestage compilation".

FYI: @martint @dain @haozhun @findepi @kbajda

sopel39 commented 6 years ago

Commit that introduces ContinuousWork<T> per concept described in this issue: https://github.com/prestodb/presto/pull/9854/commits/5d395edccb8c77b3a7f268063299e30690d46f97

martint commented 6 years ago

How does ContinuousWork introduced in the other PR relate to "NonBlockingWork"? Isn't it just the same concept?

sopel39 commented 6 years ago

Yes this is the same concept albeit with different name. I think PR name is more adequate: ContinuousWork

mbasmanova commented 6 years ago

@sopel39 Karol, is this what became com.facebook.presto.operator.WorkProcessor? Is there anything else missing or can this issue be closed now?