HewlettPackard / mds

Managed Data Structures
GNU General Public License v3.0
27 stars 5 forks source link

Add ManagedArray.streamInTasks() #42

Open EvanKirshenbaum opened 7 years ago

EvanKirshenbaum commented 7 years ago

[imported from HPE issue 325]

When doing the tests, I wanted to say

array.stream().map(v -> Task.computedValue(old -> {
  // remove effects of old if in redo
  // stash effects of v
}).collect(Collectors.toList());

But this doesn't work because the actual read from the array is outside of the TaskComputed's function, and so we won't notice if the array slot changed. What I wound up doing was

array.indexStream().mapToObject(i -> Task.computedValue(old -> {
  Val v = array.get(i);
  // remove effects of old if in redo
  // stash effects of v
}).collect(Collectors.toList());

which works, but is hard to explain.

What I'd like to be able to say is simply

array.streamInTasks().map(...).collect(...);

and have each stage of the pipeline be in a task, linked together by dependencies.

I think that the easiest way to do this is to create a TaskLinkedStream<T> class, implementing Stream<T> but containing a Stream<TaskComputed<T>> and taking it as a ctor param.
This could be created as

TaskLinkedStream<T> 
Array<T>.streamInTasks() {
  return new TaskLinkedStream<>(indexStream().mapToObject(i -> Task.computedValue(this::get)));
}

map() could be written something like

TaskLinkedStream<R>
map(Function<T,R> fn) {
  return new TaskLinkedStream<>(_stream.map(tc -> Task.computedValue(()->fn.apply(tc.get())));
}

Yeah, I know this doesn't how we get our hands on the old value for the redo without screwing up the Stream interface. I'm still working on that.

EvanKirshenbaum commented 7 years ago

[imported comment]

Okay, thinking about it a bit more, I think that all of the undo logic just needs to be in a set of Collectors we provide. These would have underlying accumulator() functions that took TaskComputed<T> and created TaskComputed<T> objects that took old vals and knew how to pull them out of whatever was being built. (Still sketchy, I know.) Then we could simply have an overarching

Stream<T> Task.inTasks(Stream<T> s)

That created a TaskLinkedStream from a provided stream and we could go from there. Unfortunately, they don't appear to have thought of the need for a method to say "Apply this function to transform this stream into another kind of stream".

This all, of course, assumes that the user will be limiting the stream to things like map() and forEach() and (possibly) toArray() (for streams with known size). We could probably have anyMatch() and allMatch() and some of the others if we remove the notion that we're actually Streams and have them return TaskComputed<Boolean> values (or maybe a TaskComputed.forBoolean). Doing things like filter() or sort() or flatMap() are probably trickier than we want to get into, because it will imply dependencies downstream.