nathanmarz / cascalog

Data processing on Hadoop without the hassle.
Other
1.38k stars 178 forks source link

hfs-textline taps don't work with :trap #187

Closed sorenmacbeth closed 11 years ago

sorenmacbeth commented 11 years ago

Easy to reproduce using the playground:

(use 'cascalog.playground)
(?<- (stdout) [?p ?p2] (follows ?p1 ?p2) ((mapfn [x] (throw (RuntimeException. "poo"))) ?p :> ?p1) (:trap (hfs-textline "/tmp/foo")))

That will throw:

UnsupportedOperationException Can't create empty: cascalog.cascading.tap.CascalogTap cascalog.cascading.tap.CascalogTap (tap.clj:23)

sritchie commented 11 years ago

Ah, nice. Is this in 2.0?

sorenmacbeth commented 11 years ago

Yep.

On Wed, Sep 18, 2013 at 10:56 AM, Sam Ritchie notifications@github.comwrote:

Ah, nice. Is this in 2.0?

— Reply to this email directly or view it on GitHubhttps://github.com/nathanmarz/cascalog/issues/187#issuecomment-24685500 .

http://about.me/soren

sritchie commented 11 years ago

I'd love a pull request to fix this and a test. I think all you need to do is change opt-seq to pull the sink out if you're dealing with a cascalogtap:

https://github.com/nathanmarz/cascalog/blob/develop/cascalog-core/src/clj/cascalog/cascading/platform.clj#L191

There's a similar function in api:

(defn normalize-sink-connection [sink subquery]
  (cond (fn? sink) (sink subquery)
        (instance? CascalogTap sink)
        (normalize-sink-connection (:sink sink) subquery)
        :else [sink subquery]))

But handling the CascalogTap and else cases should be good.

sorenmacbeth commented 11 years ago

it doesn't look to me like opt-seq is handling :trap at all, is it supposed to?

On Wed, Sep 18, 2013 at 12:10 PM, Sam Ritchie notifications@github.comwrote:

I'd love a pull request to fix this and a test. I think all you need to do is change opt-seq to pull the sink out if you're dealing with a cascalogtap:

https://github.com/nathanmarz/cascalog/blob/develop/cascalog-core/src/clj/cascalog/cascading/platform.clj#L191

There's a similar function in api:

(defn normalize-sink-connection [sink subquery](cond %28fn? sink%29 %28sink subquery%29 %28instance? CascalogTap sink%29 %28normalize-sink-connection %28:sink sink%29 subquery%29 :else [sink subquery]))

But handling the CascalogTap and else cases should be good.

— Reply to this email directly or view it on GitHubhttps://github.com/nathanmarz/cascalog/issues/187#issuecomment-24691081 .

http://about.me/soren

sritchie commented 11 years ago

It's tidying up options before passing them on to the Cascading level stuff, so we need to add a line to get it to tidy the trap option as well.

sorenmacbeth commented 11 years ago

I'm happy to do a PR for this but pretend I'm really dumb and help me understand a little better.

options is a map of the options is a sub-query, so :trap is one of the options, do I need to pull that out in the same manner that (:sink options) is being pulled out into the sink-fields key? what key should I use for the pulled out trap, and what should the value for the key be?

sritchie commented 11 years ago

Damn, I wrote this long, beautiful answer, then realized that what I'd suggested is already happening somewhere else. Ugh.

This is a legit bug. I'd still love a pull req :) here's the issue. in parse.clj:

(defn sanitize
  "Accepts a (potentially nested) data structure and returns a
  transformed, sanitized predicate generated by replacing all
  wildcards and logic variables with strings."
  [pred]
  (let [generator (if (some unground-var? (s/flatten pred))
                    gen-ungrounding-var
                    gen-nullable-var)]
    (postwalk (sanitize-fn generator) pred)))

(s/flatten pred) is trying to flatten out the supplied options; trying to flatten a record throws that error. Here's s/flatten:

(defn flatten
  "Flattens out a nested sequence. unlike clojure.core/flatten, also
  flattens maps."
  [vars]
  (->> vars
       (postwalk #(if (map? %) (seq %) %))
       (clojure.core/flatten)))

I wrote this because I WANTED it to flatten maps out; if you're using the positional selector, for instance. So we just need to recode this to ignore (and not try to flatten) instances of clojure.lang.IRecord.

sorenmacbeth commented 11 years ago

like this?

(defn flatten
  "Flattens out a nested sequence. unlike clojure.core/flatten, also
  flattens maps."
  [vars]
  (->> vars
       (postwalk #(if (and (map? %) (not (instance? clojure.lang.IRecord %)))
                    (seq %)
                    %))
       (clojure.core/flatten)))
sritchie commented 11 years ago

either that, or something slightly better if clojure.core/flatten tries to smash the record. does that work?

sorenmacbeth commented 11 years ago

no, what I pasted in there doesn't fix the issue.

On Thu, Sep 19, 2013 at 1:27 PM, Sam Ritchie notifications@github.comwrote:

either that, or something slightly better if clojure.core/flatten tries to smash the record. does that work?

— Reply to this email directly or view it on GitHubhttps://github.com/nathanmarz/cascalog/issues/187#issuecomment-24770613 .

http://about.me/soren

sorenmacbeth commented 11 years ago

feh, this fix is nasty :(

(defn sanitize
  "Accepts a (potentially nested) data structure and returns a
  transformed, sanitized predicate generated by replacing all
  wildcards and logic variables with strings."
  [pred]
  (let [generator (if (some unground-var? (s/flatten pred))
                    gen-ungrounding-var
                    gen-nullable-var)
        records-and-others (group-by #(instance? clojure.lang.IRecord %) pred)
        records (get records-and-others true)
        others (get records-and-others false)]
    (concat (postwalk (sanitize-fn generator) others) records)))

I had to do the records-and-others thing in jackknife.seq/flatten as well. Basically you can't pass IRecords into postwalk.

Open to ideas for cleaning this up, or just better solutions.

sritchie commented 11 years ago

If it works, good to go by me. You could use s/separate and destructure the two returned sequences?

sorenmacbeth commented 11 years ago

fixed by #188