aphyr / tesser

Clojure reducers, but for parallel execution: locally and on distributed systems.
873 stars 39 forks source link

Using CollFold protocol sources in tesser #9

Closed mangolas closed 9 years ago

mangolas commented 9 years ago

I have pretty good results on core reducers with iota file-seqs which implements CollFoll protocol on top of memory mapped files.

Can the tesser utilize such sequences or how one could achieve the similar splitting big file for parallel folds with tesser?

aphyr commented 9 years ago

Tesser should be able to fold any sequence of reducibles. Have you tried it?

mangolas commented 9 years ago

Sure, it works, but not as fast as reducers fold. So I suspect it's traversing the sequence normally rather than using the provided CollFold implementation.

;; Tesser version of iota seq
(time
    (->> (t/map (fn [_] 1))
         (t/fold +)
         (t/tesser (t/chunk 1024 (iota/seq bigfile 1024)))))

"Elapsed time: 7797.347623 msecs"
=> 8189863

;; Tesser version of line-seq
(time
    (with-open [inp (io/reader bigfile)]
      (->> (t/map (fn [_] 1))
           (t/fold +)
           (t/tesser (t/chunk 1024 (line-seq inp))))))

"Elapsed time: 3499.529274 msecs"
=> 8189863

;; Iota seq with core.reducers
(time
    (->> (iota/seq bigfile 262144)
         (r/map (fn [_] 1))
         (r/fold +)))

"Elapsed time: 1537.073155 msecs"
=> 8189863

;; Iota rec-seq, my improved Iota-seq version
(time
    (->> (iota/rec-seq bigfile 262144)
         (r/map (fn [_] 1))
         (r/fold +)))

"Elapsed time: 655.624505 msecs"
=> 8189863

This is iota's CollFold implementation, is something similar possible/needed for Tesser?

(defn- foldseq
  "Utility function to enable reducers for Iota Seq's"
  [^iota.FileSeq s n combinef reducef]
  (if-let [[v1 v2] (.split s)]
    (let [fc (fn [child] #(foldseq child n combinef reducef))]
      (fjinvoke
       #(let [f1 (fc v1)
              t2 (r/fjtask (fc v2))]
         (fjfork t2)
         (combinef (f1) (fjjoin t2)))))
    (reduce reducef (combinef) (.toArray s))))

(extend-protocol r/CollFold
  iota.FileSeq
  (coll-fold
    [v n combinef reducef]
    (foldseq v n combinef reducef)))
aphyr commented 9 years ago

I found manual (reduce) was faster than using CollFold for the in-memory structures I was working with, but that doesn't mean we have to use (reduce) in all cases. You're welcome to write a variant of core/tesser which uses CollFold instead, or, if it benches well on vectors & arrays, I'm happy to see us do polymorphic dispatch or have an option to control which strategy to use...

mangolas commented 9 years ago

Ok, good to know that is the case. It would be interesting to see if I could create such a variant, at least I can try.

mangolas commented 9 years ago

I looked a bit at core/tesser, but I couldn't see any easy path on CollFold usage.

But instead I created a iota chunk-seq, which provides and fast way to split and chunk memory mapped files in an iterable style. It's still not as fast as iota with core/reducers, but quite close.

So I think this is fair enough and gives a way to use iota with tesser.

(time
    (->> (t/map (fn [_] 1))
         (t/fold +)
         (t/tesser (iota/chunk-seq bigfile 262144))))
"Elapsed time: 1898.995341 msecs"
=> 8189863