uwescience / raco

Compilation and rule-based optimization framework for relational algebra. Raco is the language, optimization, and query translation layer for the Myria project.
Other
72 stars 19 forks source link

Pipeline Parallel Ingest (CSVFileScanFragment operator) #536

Open jortiz16 opened 7 years ago

jortiz16 commented 7 years ago

Currently, parallel ingest is only accessible through a REST call to the coordinator. The coordinator builds a query plan and sends a CSVFileScanFragment with the appropriate byte ranges to each worker.

We somehow need to introduce parallel ingest to MyriaL. The tricky part will be to figure out how to pipeline this operator. Should Raco be the one figuring out how to split the byte ranges? I think we discussed this before and we preferred not to introduce any AWS/S3 API to Raco.

jortiz16 commented 7 years ago

tagging @senderista

senderista commented 7 years ago

I thought an alternative we discussed in the past was to have Raco encode a pseudo-operator run at the coordinator which would be initialized only with the S3 URL and would dynamically create and dispatch the local operators (i.e., CSVFileScanFragment initialized with the appropriate ranges)? Are there other examples of operators which create an operator tree at runtime?

adding @jingjingwang

jortiz16 commented 7 years ago

I vaguely remember talking about this. I do remember that we agreed to give Raco only the S3 URL, but I'm not familiar with how it can create the plan from there or whether there is another existing operator that does something like this.

jortiz16 commented 7 years ago

So would initializing the operator at runtime within MyriaX be the way to go? I ran into a situation where I need to use this feature, so I'm trying to tackle it at the moment.

So here was my thinking...

  1. implement a fake operator on the Raco side
  2. on the MyriaX side, implement the corresponding encoding for the fake operator
  3. when initializing the operator, we need to make sure it is initialized with the correct byte ranges. This almost seems like it needs to be worker independent. The problem here is that when we initialize the operator, it does not know any information about the worker at this point.

We could make it so the fake operator encoding has information about the worker id (Raco could probably take care of this?). That way, when we initialize the operator, we have the worker id from the encoding. We can then create a new CSVFileScanFragment constructor that can take the worker id parameter and initialize the constructor with the correct byte ranges.

Not sure if this would be the best approach.

senderista commented 7 years ago

FWIW I'm doing something vaguely similar to this right now in this PR: https://github.com/uwescience/myria/pull/858. One question is whether we can assume that all worker IDs are contiguous. If so, then each worker operator instance could initialize its own byte range (taking the minimum partition size into account) after it determines the object size, but we probably can't assume this if we want to be fault-tolerant.

I think we can dispense with the assumption of contiguous worker IDs as long as we ensure that all workers share the same live-workers view. We should be able to do this in OperatorEncoding.construct() by querying the server and initializing a private serializable field (see the above PR for an example). With that information, the minimum partition size, and the object's size in bytes, the workers should be able to determine which byte range they're responsible for without coordination.

(FYI, an Operator can get its worker ID with ((WorkerSubQuery) getLocalSubQuery()).getWorker().getID().)

jortiz16 commented 7 years ago

This sounds good, thanks! I'll try it

jortiz16 commented 7 years ago

Sorry, I think I'm still missing something. So I'm adding this logic under the CSVFileScanFragmentEncoding.construct() method, but I don't see how we can call getLocalSubQuery since this requires the operator object. Basically from the operator encoding, is there a way to get the operator object itself?

senderista commented 7 years ago

What I meant was to pass the live-workers view to the Operator constructor by calling args.getServer().getAliveWorkers() from within OperatorEncoding.construct(), then calling getLocalSubQuery() from Operator.init() or elsewhere. The Operator instance returned from OperatorEncoding.construct() is only used for serialization (so any field you initialize in the constructor called from construct() must be serializable), while Operator.init() is only called on the "real" Operator instance when it is deserialized on the worker where it runs.