damballa / parkour

Hadoop MapReduce in idiomatic Clojure.
Apache License 2.0
257 stars 19 forks source link

Dseq for a range of integers / custom input splits from clojure code #9

Open mjwillson opened 9 years ago

mjwillson commented 9 years ago

I was looking for a way to create a dseq for a range of integers, which can be split amongst the mappers.

So far the closest I found was parkour.io.mem/dseq, but this only works in local mode.

Is there a way to implement custom dseqs with custom logic for input splits from clojure code? Perhaps I need to implement (and AOT-compile) a custom InputFormat class for this kind of thing, and implement a cstep which sets this class's name as mapreduce.job.inputformat.class, sets config for it, then somehow wrap that as a dseq? Or perhaps all the dseq stuff assumes file-based input?

Thought I'd mention this anyway as one of the "gaps" alluded to on that documentation ticket -- it feels like something which would be really simple in idiomatic clojure code, and these dseqs are advertised as one of the core abstractions of the library, but it's not at all clear how I'd go about creating one myself, given some logic I'd like to implement for how to generate keys as input for mappers. Looking at the protocol for dseq isn't very enlightening in this respect as they mainly just appear to be wrappers for csteps.

(I did find at least one custom InputFormat for integer range here: https://github.com/twitter/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/IntegerListInputFormat.java , although I'm not sure why this particular class uses static fields for its config and if that would work with parkour.)

llasram commented 9 years ago

It should be possible to take any existing InputFormat and use it as a dseq, even one which uses static fields for no reason. As looking at some of the existing examples (parkour.io.{text,seqf}) should suggest, you just need to write a Clojure function which use the Hadoop API to configure a Job to use the input format then "bless" the function with parkour.io.dseq/dseq. (With the blessed function usually itself being a closure over the particular input parameters.) Dseqs are a core abstraction, but their goal is to make access to Hadoop data more Clojure-ish; I haven't spent as much time making Clojure data more Hadoop-able.

Simplifying writing new InputFormats (and OutputFormats) from Clojure is something I've considered adding to Parkour, but just hasn't come up enough for me to spend time on it yet. There's a few potential stumbling blocks (the need for input splits to have a backing concrete Writable class), but I think it'd be possible to work around those, at least for the common case.

I have been meaning to add an InputFormat which allows mapping over an existing in-memory collection, by serializing sections of it to the InputSplits as EDN. Your request suggests a relatively simple extension of that where each split's input collection is the result of a function applied to the raw serialized backing split data. But then by that point we're halfway to support for writing arbitrary new InputFormats.

I'd like to stew for a bit on some ideas here. If you'd like to participate, we can use this ticket for design discussion.

mjwillson commented 9 years ago

Thanks, that helps in terms of understanding what dseqs are and aren't trying to do. Sorry for the possibly-slightly-silly question-- I guess it was bad luck that the first thing I tried to do struck on a complicated use case.

I guess most people will be using hadoop data as input so will be using an existing InputFormat so I'm the odd one out here -- I want to use programatically-generated values as input to my mappers, which can be generated lazily.

Generating those values eagerly and serializing them as EDN would work, although perhaps not ideal if you're serializing a massive list of numbers which could be generated programatically. I wonder if it'd be possible to have a way to create a dseq from a var which maps to a lazy seq of lazy seqs for the splits:

(def splits (partition 10 (range 100)))
(parkour.lazy-splits/dseq #'splits)

Although I guess you'd have to be a little bit careful about things like chunking in the lazy sequences, and perhaps careful to avoid sequential dependences if you want to allow each split to be generated independently of the other ones. E.g. the following might be better than the example above:

(def splits (map #(range % (+ 10 %)) (range 0 100 10)))
mjwillson commented 9 years ago

Perhaps requiring a function which returns a vector of lazy sequences would make the requirement for laziness more explicit:

(defn splits
   []
   (mapv #(range % (+ 10 %)) (range 0 100 10)))

(parkour.lazy-splits/dseq #'splits)
mjwillson commented 9 years ago

What I'm missing here is the details of when / where / how these InputFormat and InputSplit instances are serialized and passed around when a job runs on multiple nodes. It sounds like the InputSplits need to be serializable somehow (despite that not being part of the InputSplit interface). I guess that's the difficulty here and why you were thinking about:

result of a function applied to the raw serialized backing split data

Perhaps best if I leave this to the better informed :)

llasram commented 9 years ago

I think you're on the right track. Here's my breakdown (partially to clarify my own thoughts):

In the actual implementation in Hadoop, there are a few complications:

I believe we can simplify this significantly, via the following:

The one tricky thing left is the record readers. They’re just inherently mutable, in way which doesn’t work well with Clojure idioms, and driven in all their mutability by code deep in Hadoop MapReduce. I’m going to mull on them a bit longer, and see if I can’t think of a way to abstract them a bit more nicely from the Clojure perspective.

llasram commented 9 years ago

I've thought about it for a bit, and I don't see a way of completely hiding RecordReaders without loss of generality, but I think a convenience interface allowing the user to provide a function which returns reifyed instances of the combination of the Seqable, Counted, and Closeable interfaces would cover most uses. I should have a chance to whip something together over the next few days.

llasram commented 9 years ago

Design changed slightly when faced with actual implementation, but check out PR #10 for my proposed implementation of the basic interface and some examples.

mjwillson commented 9 years ago

Ooh nice, thanks! I ended up writing my own IntegerRangeInputFormat in java for now, but will try this out when I can

llasram commented 9 years ago

Parkour 0.6.2 is now released and includes the basic interface mentioned. This basic interface does not include providing access to the FileInputFormat utility code, so I'm going to leave this issue open for now.