nathanmarz / cascalog

Data processing on Hadoop without the hassle.
Other
1.38k stars 178 forks source link

Aggregator parameters can't be regular Clojure variables #219

Closed f355 closed 10 years ago

f355 commented 10 years ago

If I have a simple aggregator like this:

(defaggregatefn test-aggregator
  ([] [])
  ([state param val] (conj state (* param val)))
  ([state] state))

then in 2.0.0 (but not in 1.10.2) it is impossible to run this query:

(let [param 2]
  (?<- (stdout)
       [?k ?v]
       ([["k1" 1] ["k2" 2] ["k3" 3]] :> ?k ?v0)
       (test-aggregator param ?v0 :> ?v)))

It throws the following exception:

RuntimeException Can't apply all aggregators. These fields are missing: (2) jackknife.core/throw-runtime (core.clj:159)

However, if I make extra Cascalog variable just to hold my parameter, then it works as expected:

(let [param 2]
  (?<- (stdout)
       [?k ?v]
       ([["k1" 1] ["k2" 2] ["k3" 3]] :> ?k ?v0)
       (identity param :> ?param)
       (test-aggregator ?param ?v0 :> ?v)))

Maybe I'm misunderstanding something, but to me it looks like a regression and somehow related to #26.

Quantisan commented 10 years ago

2.0 recommends using anonymous functions. you can rewrite your aggfn like so:

(defn test-aggreagtor [param]
  (aggregatefn
    ([] [])
      ([state val] (conj state (* param val)))
      ([state] state)))

Then you can use it in the query as

((test-aggregator param) ?v0 :> ?v)

Does this solve your issue?

f355 commented 10 years ago

Yes, that works, thank you very much.

I still have to point out that both this recommendation and aggregatefn macro are not documented anywhere, and exception being thrown is rather obscure, so it is almost impossible to figure out the right way to do it.

Quantisan commented 10 years ago

yes, we started an upgrade guide but haven't had a chance to finish yet. Pull requests are always welcome though! :)