Closed eightysteele closed 12 years ago
Yup, +1. I got too excited with the S3 client and blasted gulo-copy
. I'll update this pull to source S3 and sink locally via query. It'll still sink multiple parts, but then we just run s3-merge
and boom, done.
Cool. Also, for the merge, take a look at lfs-tap
and the :sinkparts
option. If you set the number of parts to 1, then it may merge it into a single partfile, textline format. This may simplify things. Source from and S3 bucket, sink to a local file system tap with one part, and then put that someplace else on S3.
@danhammer just yanked the S3 stuff and replaced with s3parts->file
query. The lfs-tap
, whoa, good call dude. That should remove the need for merge-parts
. I'll push that next and then this push should be set.
@danhammer what's the scheme
suppose to be in lfs-tap
: http://nathanmarz.github.com/cascalog/cascalog.api.html#var-lfs-tap
@danhammer if everything else here is solid, let's merge this thing and defer lfs since it's just an optimization. Created issue: https://github.com/VertNet/gulo/issues/33
@eightysteele, totally. Just a suggestion. Although, it seems like the :sinkparts
option isn't working:
(?- (lfs-textline "/tmp/test-lfs/" :sinkparts 1 :sinkmode :replace)
(lfs-textline "/tmp/test-src/"))
This should yield just one part file, but instead yields the following:
dan@hammer-statistic:/tmp$ ls tes*
test-lfs:
part-00000 part-00001
test-src:
a.csv b.csv
Yeah man, I think because lfs-textline
doesn't take a :sinkparts
option (but lfs-tap
does, though it requires a scheme
). I love the suggestion though, and let's land it down the line.
oh, right. good point.
This can be solved using a function that Sam wrote:
(ns cascalog.more-taps
(:use cascalog.api)
(:require [cascalog.tap :as tap]
[cascalog.vars :as v]
[cascalog.workflow :as w])
(:import [cascading.scheme TextDelimited WritableSequenceFile]
[cascading.tuple Fields]
[org.pingles.cascading.protobuf ProtobufSequenceFileScheme])
(defn- delimited
[field-seq delim & {:keys [classes skip-header?]}]
(let [skip-header? (boolean skip-header?)
field-seq (w/fields field-seq)
field-seq (if (and classes (not (.isDefined field-seq)))
(w/fields (v/gen-nullable-vars (count classes)))
field-seq)]
(if classes
(TextDelimited. field-seq skip-header? delim (into-array classes))
(TextDelimited. field-seq skip-header? delim))))
(defn lfs-delimited
"Creates a tap on the local filesystem using Cascading's
TextDelimited scheme. Different filesystems can be selected by
using different prefixes for `path`.
Supports keyword option for `:outfields`, `:classes` and
`:skip-header?`. See `cascalog.tap/hfs-tap` for more keyword
arguments.
See http://www.cascading.org/javadoc/cascading/tap/Hfs.html and
http://www.cascading.org/javadoc/cascading/scheme/TextDelimited.html"
[path & opts]
(let [{:keys [outfields delimiter]} (apply array-map opts)
scheme (apply delimited
(or outfields Fields/ALL)
(or delimiter "\t")
opts)]
(apply tap/lfs-tap scheme path opts)))
Exact:
https://github.com/nathanmarz/cascalog-contrib
We're already riding on cascalog-contrib for text delimited stuff, but :sinkparts
doesn't force a final reducer (see cascading TextDelimited Javadocs) so when we revisit #33 we'll need to add a group by to the queries in core.
OK man, merge this thing!
Wow. This is very cool. But why do you have to manually download all part files from S3, instead of just using the folder as a HFS source?