Open senderista opened 8 years ago
@jingjingwang would appreciate your feedback.
What if we need to upload a query result? There will be no pre-computed MD5 checksum for it during ingestion.
I'm thinking if we can avoid this awkward design of having an InputStream
and OutputStream
together. Seems that we can use their REST API for the same purpose: http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingRESTAPImpUpload.html. In this case we can use OutputStream
to write to the connection directly. I haven't looked into their doc in detail so I'm not sure if there are any implementation challenges though.
Actually there are also REST APIs for HDFS: https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html.
@jingjingwang I'd prefer to avoid using any AWS raw HTTP APIs since there's so many details we'd have to handle (e.g., retrieving IAM role credentials, properly signing all requests with the SigV4 protocol, etc.). I really want to stick with the SDKs whenever possible (which generally provide a low-level wrapper over the HTTP APIs as well as higher-level interfaces). We already use the PipeStream
approach for HTTP dataset downloads, so I don't think it should be too hard to use for S3 uploads as well (except that we would need to queue up the uploadPart
call on an Executor
, which could be encapsulated within the DataSink
implementation).
Perhaps we could make DbInsert
optionally compute an MD5 checksum, which would be stored in catalog metadata for the partition?
Yeah, MD5 in DbInsert
sounds great. About PipeStream
, we need to check some stuffs: 1. the thread model, since currently we schedule one thread for the whole query fragment, and I'm not sure if spawning another thread within an operator can cause problems (or at least pollute the thread model a little bit), and 2. a query fragment is ready to be scheduled when it's output channels are available, and I'm not sure how to pass the availability of S3 upload output channel to our state machine yet. When I get some time, I need to check how the SDK works explicitly.
@jingjingwang BTW if you're still interested in using the REST API for S3, I suggest taking a glance at how boto
does it and ask if you really want to enter that world of pain ;-) https://github.com/boto/boto/blob/2.36.0/boto/auth.py#L549
Also see https://www.thoughtworks.com/mingle/infrastructure/2015/06/15/security-and-s3-multipart-upload.html for security issues with parallel multipart uploads and https://blogs.aws.amazon.com/bigdata/post/Tx17QMNRC450WPX/Using-AWS-for-Multi-instance-Multi-part-Uploads for general background. Since STS credentials on different instances won't work for multipart uploads, we probably will need to use a dedicated IAM user with credentials managed in KMS (see first link for sample IAM policy).
We should make it possible to upload a partitioned relation as a single S3 object, from each worker in parallel, with little or no coordination required, and no appreciable burden on the coordinator.
The relation will be uploaded to S3 using the S3 multipart upload API. Each partition will be a separate part of the upload, and each worker will manage its own partition's upload separately (we could actually further parallelize the upload locally, but that's out of scope for now). Coordination is only required before and after all partition uploads complete. Before the uploads can begin, the coordinator needs to obtain an upload ID using
initiateMultipartUpload
, initialize subqueries corresponding to the separate uploads of each partition with this upload ID, and wait for each subquery to successfully complete a call touploadPart
and return an ETag that they obtain from AWS in the response touploadPart
. The coordinator then passes this list of ETags, along with the corresponding upload part indexes, tocompleteMultipartUpload
. When this call returns (which could take several minutes), the response contains an ETag for the entire object. If the coordinator is forced to time out the call tocompleteMultipartUpload
, or any of the workers' calls touploadPart
fail, it should callabortMultipartUpload
, which will free any storage used by the uploaded parts.Concretely, the
uploadPart
API consumes anInputStream
, but theDataOutput
operator needs aDataSink
which can supply anOutputStream
. ThisDataSink
implementation could simply extendPipeSink
(which really should be extended separately for the existing stream-to-HTTP-client functionality), and therefore expose anInputStream
touploadPart
and anOutputStream
toDataOutput
. One complexity would be that the producer and consumer of aPipeStream
(i.e., aPipedInputStream
/PipedOutputStream
pair) need to run on separate threads to avoid deadlock. I'm not sure if it would make sense to introduce a local exchange operator for this reason (which would implicitly place the producer and consumer on separateLocalFragment
s and therefore separate threads), or we should just explicitly create a new thread (or rather schedule a task on anExecutor
) for theuploadPart
call.One further note: unless it's too expensive, the workers should calculate an MD5 hash of their partition's data and pass it to
uploadPart
. Since this would require two passes over the data, it might be a good idea to calculate an MD5 checksum during ingestion and store it in the master catalog with the partition's metadata.