vavr-io / vavr

vʌvr (formerly called Javaslang) is a non-commercial, non-profit object-functional library that runs with Java 8+. It aims to reduce the lines of code and increase code quality.
https://vavr.io
Other
5.67k stars 629 forks source link

More Stream.in() and Traversable.out() methods #79

Closed danieldietrich closed 7 years ago

danieldietrich commented 9 years ago
Stream<String> Stream.in() // same as Stream.in(System::in, Charset.default()).lines()
Stream<Byte> Stream.in(InputStream in)
Stream<Char> Stream.in(InputStream in, Charset charset)

Stream<String> stream.lines() // needs to match current stream component type and convert elements

Traversable.out(OutputStream) // writes bytes
Traversable.out(OutputStream, Charset) // writes chars

Update: Traversable.toInputStream() and Traversable.toOutputStream() could be also useful.

lukaseder commented 9 years ago

Ooooh, beautiful ideas! I suspect that passing an InputStream will produce a Stream<Byte> in your case?

mperry commented 9 years ago

@danieldietrich and @lukaseder

Should this return the stream within the IO monad to indicate that it is not pure? Alternately, return a lazy stream, e.g. in FunctionalJava this is a P1<Stream<A>>. You can then concatenate this stream with another stream without the effects causing problems. The stream you create here won't compose if it is eager on the head with a lazy tail.

mperry commented 9 years ago

Another potential problem is what happens if the stream is not fully evaluated. Does the resource ever close if it is lazy?

I think iteratees might help here, see Runar's post http://blog.higher-order.com/blog/2010/10/14/scalaz-tutorial-enumeration-based-io-with-iteratees/. I remember there was an iteratee implementation in FJ, but don't know how complete or comprehensive it is.

danieldietrich commented 9 years ago

@lukaseder Thank you :-) Yes, Stream<Byte> is the idea...

lukaseder commented 9 years ago

@mperry: Does the resource ever close if it is lazy?

java.util.stream.Stream extends AutoCloseable, so the close() and resource semantics should already be covered...

danieldietrich commented 9 years ago

@mperry:

The stream you create here won't compose if it is eager on the head with a lazy tail.

I think I had such a problem when iterating locking head calculation from the standard input stream. There is a bugfix for such a situation in the Stream.iterator() method but I currently don't fully understand the core problem you mentioned and will investigate it this evening...

Another potential problem is what happens if the stream is not fully evaluated. Does the resource ever close if it is lazy?

An infinite input stream will not close, currently... I will look at Runar's post this evening!

danieldietrich commented 9 years ago

@mperry, /cc @lukaseder

(Note: Here I talk about javaslang.collection.Stream, not about java.util.stream.Stream.)

Should this return the stream within the IO monad to indicate that it is not pure?

Thinking functional, a constant value c can be considered as function () -> c.

With that, a Stream of values can be considered as function (int i) -> c_i where i is an index (starting with 0): Stream<T> ~ Function<Integer, T>, which is referential transparent (because of memoization):

final Stream<Double> stream = Stream.gen(Math::random);
assertThat(stream.get(1000) == stream.get(1000)).isTrue(); // always holds

If we just look at the values of a (maybe infinitely long) stream, we cannot know, if the stream values have been created by a 'monkey' or by a pure function. Because of this, Stream is not impure in general and should not be wrapped in an IO monad.

The stream you create here won't compose if it is eager on the head with a lazy tail.

Mark, could you please give an example of Stream composition. I want to check, if the properties hold in the case of my interpretation of Stream. Is composition defined as Stream concatenation? If that is the case I don't fully understand, how effects could cause problems. I need an example...

Update:

I think I've got it. Runar's article says (see Composing Iteratees):

Notice a couple of things here. With iteratees, the input source can send the signal that it has finished producing values. And on the other side, the iteratee itself can signal to the input source that it has finished consuming values. So on one hand, we can leave an iteratee “running” by not sending it the EOF signal, so we can compose two input sources and feed them into the same iteratee. On the other hand, an iteratee can signal that it’s done, at which point we can start sending any remaining elements to another iteratee. In other words, iteratees compose sequentially.

This goes along with your valid point:

Another potential problem is what happens if the stream is not fully evaluated. Does the resource ever close if it is lazy?

As I understand, all that means: Stream has to be an AutoClosable (as Lukas mentioned) and an Iteratee to get things work correctly.

danieldietrich commented 9 years ago

What would we expect from: Stream.in().appendAll(Stream.in())... ?

Update:

According to the things already said:

1) Stream.in() should be created lazily, i.e. Stream.Cons<>(head, () -> tail()) is not sufficient, because the calculation of head is possibly blocking, e.g. when reading from the std in. 2) When we send the end-of-stream signal (Mac: ⌘-D for std in, Win: Ctrl-Z) then Stream.in().appendAll(Stream.in()) should be the same as Stream.in() because std in is closed and has no more elements.

Goal 1) can be accomplished by changing the (javaslang.collection.)Stream implementation. Goal 2) can be accomplished by using Iteratees and Enumerations (capable of both-way end-of-stream signals) instead of Iterators.

mperry commented 9 years ago

@lukaseder /cc @danieldietrich
I missed some context with my comment on closing resources in a lazy stream. My point (which I poorly stated) was that Iteratees can close the resource without fully evaluating the stream. My memory of AutoClosable was that an AutoClosable is closed automatically within the try-with-resources statement. With a normal stream, a stream might be passed around your system and might not release the resource. Looks like Daniel figured this out from the blog post.

Some comments on referential transparency (which will probably be too detailed, but please indulge me). Can we agree Math::random is not referentially transparent (RT)? If it is not then we can explore this further. This means that Stream.gen(Math::random) is not (in general) RT. Consider this java like code (with Scala like syntax):

val r = Math::random
val s1 = Stream.gen(r) // evaluate head
val s2 = s1

Now if we replace the defintion of s1 in s2 to get val s2 = Stream.gen(r) then s2 has a different value because Math::random accesses mutable state and the head will evaluate to the second random number.

The point is that Streams are only RT if elements within it are RT. I'll point to the (simple) definition of RT in "FP in Scala":

An expression e is referentially transparent if, for all programs p, all occurrences of e in p can be replaced by the result of evaluating e without affecting the meaning of p. A function f is pure if the expression f(x) is referentially transparent for all referentially transparent x.

Your comment that Stream is not generally impure is true. However you are taking java InputStream's as input and returning Stream<Byte>. You could return either Stream<IO<Byte>> or IO<Stream<Byte>> to be RT, each have different implications. I have had good success doing this and then using the monad combinators traverse and sequence. See https://functionaljava.ci.cloudbees.com/job/master/javadoc/fj/data/IOFunctions.html.

For stream composition, I only mean we should be able to combine (compose) functions on Streams in arbitrary ways. Say I had two steams, s1 and s2 and wanted to create a new stream that had the last element of s1 and then all of s2. If s1 contains side effects then you are stuffed because the head has evaluated the first side effect. If your stream is a stream of lazy IO actions then you can do this. This means that your general purpose functions should return Stream<IO<A>>, but you may then choose to run these actions with another convenience method that just returns Stream<A>. The IO could be reading from different sources (e.g. Read from stdin, then a remote HTTP get, followed by printing to stdout).

For a small example of RT IO, see my blog post, https://mperry.github.io/2014/01/03/referentially-transparent-io.html.

mperry commented 9 years ago

I started porting stream transducers before Xmas from ch 15 in "FP in Scala" that I think Runar got from Haskell. It was pretty complicated without pattern matching and kinds, but you've inspired me to press on. So I wrote some pattern matching in Java last night to help. I will push what I have in the next few days and might have something simple to show in the next few weeks.

danieldietrich commented 9 years ago

Hi Mark, hi Lukas, I have a copy of 'FP in Scala' and am currently reading Ch. 15 :) I see that you have put much effort into that topic - thank you for your thoughts and the links. My goal is to fully understand what is possible regarding resource management (which is currently a blocker for me) and composition - and, in a second step, get the Stream API right. I want to push Javaslang into a direction keeping things as simple as possible and don't expose API that will not change/improve the semantics of a program. (Off-topic example: I thought about removing the Monad interface because currently it is more a tagging interface than providing any benefits. In Scala it is needed for 'for-comprehensions' in order to provide syntactic sugar for flatmap/map cascades with filters/guards. I don't have such a notion, yet). This discussion is great - let's push our frameworks to the limit! :-) Especially I'm really looking forward to see your pattern matching code! (Update: I keep the Monad interface to signal that a type satisfies some laws - it is a tagging interface, which has to be used correctly because of the lack of higher order types in Java. Perhaps I should blog about my version of 'A Java Monad Interface' the next weeks.)

danieldietrich commented 9 years ago

I think I got a better understanding of the problem domain now.

Laziness

Letting Stream.of(InputStream) return a Stream<IO<T>> would be problematic. We have to decide at creation time if we return a Nil or a Cons. More specifically head has to be evaluated in order to decide if the Stream is Nil (e.g. an Iterator's hasNext() would call InputStream.read()). In the case of System.in we would block, even if Cons is implemented fully lazily:

    static final class Cons<T> {

        final Memoizer0<Tuple2<T, Memoizer0<Stream<T>>>> data;

        Cons(Supplier<T> head, Supplier<Stream<T>> tail) {
            this.data = Memoizer.of(() -> Tuple.of(head.get(), Memoizer.of(tail)));
        }

        public T head() {
            return data.apply()._1;
        }

        public Stream<T> tail() {
            return data.apply()._2.apply();
        }

        // ...
}

That means we have to defer the creation of the Stream. Semantically, an IO<Stream<T>> is the same as a Supplier<Stream<T>>, modulo Monad combinators (which, in this case, should be used internally in my opinion - what does not seem to be possible).

Upate: I also started to think about a third Stream type: additionally to Nil (empty Stream) and Cons (non-empty stream, head evaluated) there may be a Deferred which is one of empty and non-empty. It would occur only when needed, i.e. when creating streams based on InputStreams (perhaps also OutputStreams(!) and Suppliers). Then Stream.of(System.in()) would not block any more.

It is a little bit dirty because Nil and Cons could be replaced completely by Deferred - and finally the 1:1 dependency between Stream and Deferred would lead to just one type: Stream. On the other hand Deferred is a good solution to mark just the start of a blocking element-supplier.

Resource-safety

This is an interesting topic. Runar mentions two rules in 'FP in Scala', Ch. 15, p. 283:

1) "A producer should free any underlying resources as soon as it knows it has no further values to produce, whether due to normal exhaustion or an exception."

This should be trivial. A naïve Iterator implementaion could do this:

public boolean hasNext() {
        final boolean hasNext = (next = Try.of(reader::readLine).get()) != null;
        if (!hasNext) {
                Try.run(reader::close).recover(...).get();
        }
        return hasNext;
}

Update: This is of course not trivial because stream.take(5) should also close an underlying resource.

2) "Any process d that consumes value from another process p must ensure that cleanup actions of p are run before d halts."

I have to further investigate this in this context. Interesting are cases like

Stream.of(inputStream1).zip(inputStream2).join()

Without the loss of generality let inputStream1.length < inputStream2.length. With the Iterator impl mentioned in 1) inputStream1 is closed automatically and inputStream2 remains open. Seems that we need an IO wrapper in this case. It would be cool if it could be realized internally in the Stream implementation without bothering the Stream user with resource handling details...

mperry commented 9 years ago

@danieldietrich Hopefully I'll respond to your comment above in the next few hours. I wanted to let you know I have made quite some progress translating section 15.2 (Simple stream transducers). The code is here, https://github.com/mperry/stream-processing. A fair bit of it is pretty rough and needs improvement, but it might help to make some ideas more concrete for you. Ignore the 'oo' package and just look at the 'match' package.

mperry commented 9 years ago

On laziness with Stream<IO<A>>:

Yes the head of the stream is eager (strict), but this is not a problem. Creating an IO<A> does not run the effect, merely creates an object that represents the effect. The IO action still has to be run to do anything.

I have the impression (which is perhaps the wrong impression) that you want to memoize by default. I think this is a mistake as it means the entire stream must be kept in memory, defeating the point of incrementally reading a stream.

I didn't understand your comment on using the monad combinators internally. Perhaps you could restate or expand on this. I also didn't understand your comment on having a Deferred type. Do you just mean a stream with a lazy head and tail?

On resource safety:

I didn't understand your example: Stream.of(inputStream1).zip(inputStream2).join()

If I consider the types: Stream.of(inputStream1) returns Stream<Stream<A>> and then zip would return Stream<P2<Stream<A>, B>>, which means join would not have the right type.

Section 15.3 talks alot about resource safety, so I will have to think more carefully about this when I translate this.

danieldietrich commented 9 years ago

@mperry thank you for the link to your project. It looks really nice. I like the new match API, it is clear and composable. My matcher has too much voodoo magic wired-in because I wanted to provide a nice lambda syntax. You make the type explicit while here is some kind of extended reflection for lambdas used. Where do you need composability of When (flatMap/map)? The transducers are beautiful.

mperry commented 9 years ago

I am not convinced about the match API.

The match class is ok, although I can see me moving my code from using createMatch(...).match(A) to just match(A a, F<A, B> fallback, When<A, B> w) in most cases.

The When api is more problematic. consider this code:

    public Process<I, O> append2(Process<I, O> p) {
        return Match.match(this, h -> halt(),
            When.<Emit<I, O>, Process<I, O>, Process<I, O>>whenClass(Emit.class,
                (Emit<I, O> e) -> Emit.emit(e.head, e.tail.append(p))
            ).appendClass(Await.class,
                (Await<I, O> a) -> Await.await(andThen(a.receive, p2 -> p2.append(p)))
            )
        );
    }

A few problems I see here:

On the positive side, I am pretty convinced pattern matching here is a better option than spreading the functionality around the class hierarchy. Also it should be pretty clear that the class of the first argument when creating a when should be the same as the first argument to the function.

I am not sure when you would need map and flatMap. The flippant answer is whenever that is the type you need. It is like constructing a case of when you need map/flatMap for a list, the same examples would serve for a When. A When is essentially a partial function F<A, B> with function f Option<B> f(A a). Mapping and flatMapping over a When is then essentially the same as mapping over a function (andThen for one argument functions) and flatMapping over a function (equivalent to flatMap for the reader monad).

danieldietrich commented 9 years ago

@mperry

I didn't understand your comment on using the monad combinators internally. Perhaps you could restate or expand on this. I also didn't understand your comment on having a Deferred type. Do you just mean a stream with a lazy head and tail?

I meant a lazy Stream that delegates to a Stream that may be empty or non-empty. Let me describe this in detail:

These are the classes (pseudo Java):

interface Stream<T> {}
class Nil<T> implements Stream<T> {}
class Cons<T>(T head, Supplier<Stream<T>> tail) implements Stream<T> {}

If the IO<A> is wrapped in a Stream<IO<A>> and the stream is created, it has to be one of Nil and Cons (which are the only implementations). But if we construct the Cons with head of type IO<A> we may have the case that, when executing the IO<A>, we get the end of stream. Then instead of Cons we should have constructed a Nil (chicken and egg problem).

A solution would be to create an IO<Stream<A>> to first execute the IO to check, if there is any input and then construct either a Cons (if there was input) or return Nil (otherwise). But we want a Stream instead of an IO. Therefore my idea was to create a third implementation of Stream called 'Deferred':

interface Stream<T> {}
class Nil<T> implements Stream<T> {}
class Cons<T>(T head, Supplier<Stream<T>> tail) implements Stream<T> {}
class Deferred<T>(Supplier<Stream<T>> stream) implements Stream<T> {}

I've committed a first implementation today. Now Stream.stdin() internally calls Stream.of(iterator) which is implemented as follows:

static Stream<T> of(Iterator<T> iterator) {
        return new Deferred<>(() -> {
                if (iterator.hasNext()) {
                        new Cons<>(iterator.next(), () -> Stream.of(iterator)); // stack is safe - no recursion
                } else {
                        return Nil.instance();
                }
        });
}

Example of the concrete behavior:

Test code:

Stream<String> lazy = Stream.stdin(); // does not block any more!
System.out.println("Press Ctrl-Z (win), Ctrl-D (unix/linux), Cmd-D (mac) to quit.");
System.out.println("head: " + lazy.head()); // prints after user typed a line
lazy = lazy.tail(); // does not block
System.out.println("tail...");
lazy.forEach(System.out::println);

Output ( the > test parts are typed manually):

Press Ctrl-Z (win), Ctrl-D (unix/linux), Cmd-D (mac) to quit.
> test
head: test
tail...
> test
test
> Cmd-D

That means that Stream.stdin() does not block any more. Also subsequent tail() calls do not block until head() is called on tail.

I have the impression (which is perhaps the wrong impression) that you want to memoize by default. I think this is a mistake as it means the entire stream must be kept in memory, defeating the point of incrementally reading a stream.

Yes, that's right - but in my case no problem. I don't use a Map for caching values. Instead I cache lazy generated values per element. Cons memoizes its tail and Deferred caches its wrapped Stream.

I made tests, printing infinitely long byte streams to the console and calling frequently the garbage collection. This behaved nicely. However, without calling the garbage collector explicitely, the mem explodes (but the CPU was at 700%).

Here is the test code:

InputStream in = new InputStream() {
    Stream<Byte> stream = Stream.gen(Math::random).map(d -> new Double(d * 255).byteValue());
    @Override
    public int read() throws IOException {
        int next = stream.head();
        stream = stream.tail();
        return next;
    }
};

Stream.bytes(in).map(i -> {
    if (Math.random() < 0.1) {
        System.gc();
    }
    return i;
}).out();

The Java process stays constantly at 35 MB.

Update: Oh noes: By producing more output / fewer gc(), the mem goes up:

- - - - - -8<- - - -*snip*- - - - - -8<- - - -
if (Math.random() < 0.00001) {
        System.err.println("#### " + i);
        System.gc();
}
- - - - - -8<- - - -*snap*- - - - - -8<- - - -

I didn't understand your example: Stream.of(inputStream1).zip(inputStream2).join()

If I consider the types: Stream.of(inputStream1) returns Stream<Stream> and then zip would return Stream<P2<Stream, B>>, which means join would not have the right type.

Yes, the code example was very unclear (in fact also wrong). inputStream1 and inputStream2 are of type java.util.InputStream. Also there is no such of-method.

Here is a better example:

Stream.bytes(inputStream1).zip(Stream.lines(inputStream2)) returns Stream<Tuple2<Byte,String>>.

Then .join() would concatenate the String representations of all elements and forces the Stream to be evaluated.

If one Stream is finite and the other infinite, the infinite (currently) would not be closed by zip, which is a resource leak.

Closing resources appropriately comes next...

mperry commented 9 years ago

Your deferred is just a lazy stream. I changed the signature of your code above to return P1<Stream<T>>:

static P1<Stream<T>> of(Iterator<T> iterator) {
        return P.lazy(u -> {
                if (iterator.hasNext()) {
                        new Cons<>(iterator.next(), () -> Stream.of(iterator)); // stack is safe - no recursion
                } else {
                        return Nil.instance();
                }
        });
}
danieldietrich commented 9 years ago

What is the u needed for in P.lazy(u -> { ... })?

Now new Cons<>(iterator.next(), () -> Stream.of(iterator)) receives a Supplier<P1<Stream<T>>> instead of a Supplier<Stream<T>>. Do we really need the extra type P1? Unfortunately it is not of type Stream...

Update: I like the use of SoftReference for memo in fj!

danieldietrich commented 9 years ago

I read you prev post in depth tomorrow - really need some sleep :)

mperry commented 9 years ago

In FunctionalJava P1 is currently not an interface. To get around this when trying to instantiate a P1, there is a static function called lazy on P that has type P1<A> lazy(F<Unit, A>).

You are right, my example above is incorrect. Cons should just call Stream.of for the lazy tail part. Remember that Supplier and P1 are equivalent, so my example could return Supplier<Stream<T>> instead of P1<Stream<T>>. The point is the use of Deferred is hiding that you just want the stream to be a lazy value, no need to create a new class.

I am not sure if you read my blog post, https://mperry.github.io/2014/01/03/referentially-transparent-io.html, but I think you would be interested in the sequenceWhile function which has the type static <A> IO<Stream<A>> sequenceWhile(final Stream<IO<A>> stream, final F<A, Boolean> f). In my post, the Stream<IO> is a stream of REPL IOs where each IO prints some text, gets the next value and prints out a response. This is a good example of combining pure IO values and working on IO streams, all without performing any effects.

Your technique means you will need to rewrite something like zip for Deferred to make sure the head is never evaluated. Think of zipping two streams then dropping the first 10 elements without any effects.

I always refer back to two important monadic combinators, sequence and traverse which I always find useful, particularly in this context: public static <A> IO<Stream<A>> sequence(Stream<IO<A>> stream) public static <A, B> IO<Stream<B>> traverse(Stream<A> list, F<A, IO<B>> f)

The benefits of streams of IO are:

  • can combine arbitrary IO actions
  • can construct a stream of IO actions without effects (the IO actions are lazy)
  • can manipulate the stream (e.g. calling sequence, drop, take, zip) without performing an effect
  • can run the effect at the top level, keeping the rest of the program pure

Hope this helps you and have a good sleep!

danieldietrich commented 9 years ago

I understand IO and see its benefits. Currently I'm going one step further and investigating how to embed the Iteratee/Enumerator approach into Stream.

Wikipedia:

"Iteratees were created due to problems with existing purely functional solutions to the problem of making input/output composable yet correct. Lazy I/O in Haskell allowed pure functions to operate on data on disk as if it were in memory, without explicitly doing I/O at all after opening the file - a kind of memory-mapped file feature - but because it was impossible in general (due to the Halting problem) for the runtime to know whether the file or other resource was still needed, excessive numbers of files could be left open unnecessarily, resulting in file descriptor exhaustion at the operating system level. Traditional C-style I/O, on the other hand, was too low-level and required the developer to be concerned with low-level details such as the current position in the file, which hindered composability. Iteratees and enumerators combine the high-level functional programming benefits of lazy I/O, with the ability to control resources and low-level details where necessary afforded by C-style I/O."

Resources:

talios commented 9 years ago

On 9 Jan 2015, at 12:13, Mark Perry wrote:

  • it would be nice to unify the methods whenClass and appendClass. It feels a bit stilted and hard to remember that the first call is whenClass to create a When and the appendClass after that. I think Java was preventing me having a static and not-static method having the same name, but I need to have more of a play.

Hi all, just catching up with this thread, since this match discussion is a side-topic to the main issue, thought I'd offer a comment - instead of appendClass, whats wrong with a good old vararg list of Match instances.

return Match.match(this, h -> halt(),
        When.<Emit<I, O>, Process<I, O>, Process<I, O>>whenClass(Emit.class,
            (Emit<I, O> e) -> Emit.emit(e.head, e.tail.append(p))
        ),
        When.<Emit<I, O>, Process<I, O>, Process<I, O>>whenClass(Await.class,
            (Await<I, O> a) -> Await.await(andThen(a.receive, p2 -> p2.append(p)))
        )
    );

The repetition of generics rather ugly tho, but this could be cleaned up with a series of ( equally ugly, but for other reasons ) variations of whenClass:

return Match.match(this, h -> halt(),
        When.<Emit<I, O>, Process<I, O>, Process<I, O>>whenClass(
            Emit.class,  (Emit<I, O> e)  -> Emit.emit(e.head, e.tail.append(p)),
            Await.class, (Await<I, O> a) -> Await.await(andThen(a.receive, p2 -> p2.append(p)))
        )
    );

where the args are:

Match[] whenClass(Class c1, Process<I,O> p1, Class c2, Process<I,O> p2,......Class cN, Process<I,O> pN);

Mark

Mark Derricutt http://www.theoryinpractice.net http://plus.google.com/+MarkDerricutt http://twitter.com/talios http://facebook.com/mderricutt

mperry commented 9 years ago

@talios I should go back and clean this up. I did just enough matching to get started on stream transformers.

I wanted to do this in a generic way. For simplicity, ignore the needs for a fallback match if no class matches (i.e. we just throw an error). Then we need something like:

static <A, B> B match(A a, List<P2<Class<?> c, F<A, B>>> f)

but this is not correct and the generics makes this tricky. The functions provided will actually go from a subclass of A to B.

I don't think static <A, B> B match(A a, List<P2<Class<?> c, F<? extends A, B>>> f) works because we can't call f.f(a) and we can't cast downwards to the actual type.

I don't think static <A, B, C extends A> B match(A a, List<P2<Class<?> c, F<C extends A, B>>> f) works either as C will be inferred to A (C needs to be common across all functions, but we want each function to take a different subclass of A).

I think this is why I used generics in the first when case, to set the top level generics for what was returned (code repeated below for reference). Then in subsequent appendClass calls the type parameter for the subclass we are handling varies between each call and in each subsequent call the generics could be inferred.

    public Process<I, O> append2(Process<I, O> p) {
        return Match.match(this, h -> halt(),
            When.<Emit<I, O>, Process<I, O>, Process<I, O>>whenClass(Emit.class,
                (Emit<I, O> e) -> Emit.emit(e.head, e.tail.append(p))
            ).appendClass(Await.class,
                (Await<I, O> a) -> Await.await(andThen(a.receive, p2 -> p2.append(p)))
            )
        );
    }

If we can come up with something better, I would definitely be interested.

Does your second example (repeated below) compile? I would have thought the compiler complained the functions weren't taking a Process type as the argument to match.

return Match.match(this, h -> halt(),
            When.<Emit<I, O>, Process<I, O>, Process<I, O>>whenClass(
                Emit.class,  (Emit<I, O> e)  -> Emit.emit(e.head, e.tail.append(p)),
                Await.class, (Await<I, O> a) -> Await.await(andThen(a.receive, p2 -> p2.append(p)))
            )
        );
danieldietrich commented 9 years ago

@mperry, @talios (Mark & Mark :-))

Hi there! Sorry for joining the party a little late. I felt like a zombie the last days after releasing Javaslang 1.1.0 because of the lack of sleep and had to keep away from my laptop a while :-)

Mark P.,

static <A, B> B match(A a, List<P2<Class<?> c, F<? extends A, B>>> f)

I think it should be F<? super A, ? extends B>. Notably this takes an A and returns a B.

  • it would be nice to unify the methods whenClass and appendClass. It feels a bit stilted and hard to remember that the first call is whenClass to create a When and the appendClass after that. I think Java was preventing me having a static and not-static method having the same name, but I need to have more of a play.

I use the object oriented builder pattern to be able to use static methods, e.g.

class Match {
   static Match.Builder whenClass(...) {
      return new Match.Builder().whenClass(...);
   }
   static class Builder() {
      Builder whenClass(...) {
          // ...
          return this;
      }
      Match build() {
          // return a Match with all whenClass branches captured
      }
   }
}

Additionally Match<R> implements Function<Object, R>, i.e. if we build a Match, we can reuse it as ordinary function in an arbitrary context!

When I create a new API, I write it down exactly as I want to use it as developer, e.g. looking as Scala's match

something match {
    case Some(x) => "some: " + x
    case None => "none"
    case _ => ...
}

I ended up here for Java 8:

Match
    .caze((Some s) -> "some: " + s.get())
    .caze((None n) -> "none")
    .orElse(...)
    .apply(something); // we could use build() instead to get a function
  • the type inference sucks, I tried a few different ways, but the whenClass and appendClass methods have to be able to return the parent class Process, whilst taking a function with the subclass as argument, e.g. Emit. I might be able to play with generics to make this nicer when appending a When to another When, but it is crappy.

I think it is not possible to not give the compiler a hint of the return type, in my case:

final Function<Object, Number> f = Match
    .<Number> caze((int i) -> i) // return type would be Integer otherwise
    .caze((BigInteger i) -> i)
    .build();
final Number num = f.apply(1);
  • The class you pass in to whenClass has no relationship to the arguments in the function. Java generics sucks and because we are doing this at runtime we have to deal with type erasure. The compiler needs to support this, so this code is a workaround.

Yes, type erasure really sucks!! We have to live with that...

Mark D.,

The verbosity of Java's generics is a slam in the face. I would like to see a good type inference in a future release of Java...

Btw. - I'm currently designing the Property checker API and have nice ideas! Property checking will be next in Javaslang! functionaljava also has this, did you take a look?

- Daniel

danieldietrich commented 9 years ago

If 1) a read operation fails or 2) the end of the input is reached, we need to close the input. This has to hold also for combined input sources. If an error occurs in an underlying input source, the whole, combined input has to be closed. Also all underlying inputs have to be closed, if the wrapping input is closed from the outside. Additionally, if one of the combined inputs end has reached, all other combined inputs have to be closed, too.

This leads to the idea of a special Iterator, that handles resources and is composable:

public interface Chunked<T> extends Iterator<T>, AutoCloseable {

    static <T> Chunked<T> of(CheckedRunnable close, CheckedSupplier<Boolean> hasNext, CheckedSupplier<T> next) {
        return new Chunked<T>() {

            @Override
            public void close() {
                Try.run(close);
            }

            @Override
            public boolean hasNext() {
                return Try.of(hasNext)
                        .onFailure(x -> close())
                        .map(b -> { if (!b) { close(); } return b; })
                        .get();
            }

            @Override
            public T next() {
                return Try.of(next)
                        .onFailure(x -> close())
                        .get();
            }
        };
    }

    @Override
    void close();

    @Override
    boolean hasNext();

    @Override
    T next();

    default <T2> Chunked<Tuple2<T, T2>> combine(Chunked<T2> that) {
        final CheckedRunnable close = () -> { Try.run(this::close); Try.run(that::close); };
        final CheckedSupplier<Boolean> hasNext = () -> this.hasNext() && that.hasNext();
        final CheckedSupplier<Tuple2<T, T2>> next = () -> Tuple.of(this.next(), that.next());
        return Chunked.of(close, hasNext, next);
    }
}

There are similarities to Stream:

If Stream also implements AutoClosable, we do have the missing close() function. Given that, I should be able to provide a resource-safe implementation of Stream.

The current implementation of Stream makes use of memoization. Question: Is the following Cons eventually garbage collected, if it is not referenced any more but tail is referenced?

Cons(head, lazy tail = Nil))

The given tail is wrapped in a memoizing Lazy (currently using a hard-reference). Perhaps this prevents the GC from freeing the mem of tail, Lazy and Cons. A soft-reference may help but the lazy tail of Cons is (initially) the only reference to the tail. This means tail may be garbage collected immediately. And this means that operations of finite Streams like reverse() are unstable.

Update:

Taken from the Scala documentation of scala.collection.immutable.Stream:

One must be cautious of memoization; you can very quickly eat up large amounts of memory if you're not careful. The reason for this is that the memoization of the Stream creates a structure much like scala.collection.immutable.List. So long as something is holding on to the head, the head holds on to the tail, and so it continues recursively. If, on the other hand, there is nothing holding on to the head (e.g. we used def to define the Stream) then once it is no longer being used directly, it disappears.

  • Scala has scala.io.Source to iterate (once) over chunks of data read from an input. It is said to be "(...) just (a) hack done to provide support for limited needs."
  • Scalaz has its own streaming library scalaz-stream.
  • Play has Enumerator/Iteratee - a very general approach to stream data in a composable way

Update 2:

The main goal of Javaslang is to be simple. I would prefer one lazy type Stream for processing of arbitrary inputs, including fixed size sequences, data suppliers, Iterables/Iterators and InputStreams. Of course the implementation of Stream could differ in each of these cases, including memoization, resource handling etc. This would make Stream a general purpose tool - a swiss army knife for Joe Average (like me).

Question: Is non-blocking the same as lazy here? Answer: No. When we hear non-blocking, our reflex is Future and Promise (aka CompletableFuture). In contrast to using callbacks, a Stream is blocking when values are requested. A non-blocking behavior is out of scope here.

danieldietrich commented 9 years ago

Also http://blog.higher-order.com/assets/scalaio.pdf

danieldietrich commented 9 years ago

Daniel Spiewak is working on this: https://gist.github.com/djspiewak/6b5cd3fb78b054046755 in the context of this https://github.com/scalaz/scalaz-stream/blob/master-a/src/main/scala/scalaz/stream/Process.scala#L657

danieldietrich commented 9 years ago

(3 Months later...)

Now it is time to get this done. @mperry: Introducing a Deferred Stream is dirty, it would pollute the Stream code with outside IO aspects (like blocking) - I will take the approach you suggested by returning a lazy Stream when contstructing a Stream from a possibly blocking InputStream.

Additionally I will experiment with Chunked data, which is Autoclosable and composable...

Update: I could also imagine a third possibility beside Lazy<Stream<T>> and Stream<Lazy<T>>:

Internalizing the 'laziness' and keep the Stream type definition 'clean':

class Cons<T> implements Stream<T> {

    final Lazy<T> head;
    final Lazy<Stream<T>> tail;

    Cons(Lazy<T> head, Lazy<Stream<T>> tail) {
        this.head = head;
        this.tail = tail;
    }

    T head() {
        return head.get();
    }

    Stream<T> tail() {
        return tail.get();
    }
}

The headhas to be Lazy in order to make the Stream non-blocking. An eager head would make the whole Stream block if the underlying InputStream is blocking.

The tailhas to be Lazy to be referential transparent, i.e. consistent regarding re-evaluation of the Stream. If the tail would be a stateful Supplier instead of a Lazy, it could return a different tail each time it is called. A memoizing Lazy is preventing this.

Update 2: Of cause we still need Stream.of(InputStream) to return a Lazy<Stream<T>> because the result could be Nil, i.e. the empty Stream

danieldietrich commented 9 years ago

How is memory managed regarding of memoized values in conjunction with infinitely long stream which recursively hold on head/tail? Scala seems to internally memoize values (via SoftReference?). On the other hand, an Iterator does not store elements.

Taken from the Scala 2.11.5 API reference of scala.collection.immutable.Stream:

There are a number of subtle points to the above example.

  • The definition of fibs is a val not a method. The memoization of the Stream requires us to have somewhere to store the information and a val allows us to do that.
  • While the Stream is actually being modified during access, this does not change the notion of its immutability. Once the values are memoized they do not change and values that have yet to be memoized still "exist", they simply haven't been realized yet.
  • One must be cautious of memoization; you can very quickly eat up large amounts of memory if you're not careful. The reason for this is that the memoization of the Stream creates a structure much like scala.collection.immutable.List. So long as something is holding on to the head, the head holds on to the tail, and so it continues recursively. If, on the other hand, there is nothing holding on to the head (e.g. we used def to define the Stream) then once it is no longer being used directly, it disappears.
  • Note that some operations, including drop, dropWhile, flatMap or collect may process a large number of intermediate elements before returning. These necessarily hold onto the head, since they are methods on Stream, and a stream holds its own head. For computations of this sort where memoization is not desired, use Iterator when possible.

Update: I think that SoftReferences are not a good idea in this context!

danieldietrich commented 9 years ago

I think I wired the resource handling up using Chunked mentioned above.

The whole blocking stream thing seems not to be solvable with streams which pull data, i.e. when they ask for availability. I like the explanation here: jlongster/transducers.js#22

I think with transducers a pipeline of operations can be recorded in some way. Then the blocking input is connected to the transducer. When input is available, it is pushed to the transducer, which starts a 'playback' of operations on the chunks.

This would lead to a new data type Transducer - Streams are no option for non-blocking data handling...

It needs a decision, would Stream should do and what no, i.e. if streams can be created based on InputStreams, if they should be AutoClosable, etc.

danieldietrich commented 9 years ago

Famous last words: 'A stream is a lazy linked list.'

danieldietrich commented 9 years ago

@lukaseder I read your tweet

Sophisticated Trolling. Noun. The rare ability of luring both sides into their inevitable doom, walking away from discussion smirkingly.

and just want to say, that this issue moved to https://github.com/javaslang/javaslang/issues/186 in order not to be a Troll :-)

lukaseder commented 9 years ago

Huh, what? I'm not sure if I see the link now... Because I "walked away" from this issue here?

danieldietrich commented 9 years ago

There is no direct link - in order to provide a good support here I'm figuring out how to close a ticket. If multiple people are discussing a specific subject, I don't want to just close the ticket without a common agreement. In this case, it should be ok. The whole async/reactive thingy is not just a feature, it's an epic...

lukaseder commented 9 years ago

Aha :-) Hehe, ok. I see that this has been bothering you for a while, then :-)

I usually conclude with a couple of actions, creating a new issue for each action, linking the actions from the thingy / epic, and then closing the thingy / epic... (e.g. as a duplicate)

danieldietrich commented 9 years ago

Nice! I will copy that :-)

danieldietrich commented 8 years ago

I want Javaslang to have a processing pipeline for chunked data that may fail to read (/write?).

danieldietrich commented 7 years ago

I drop this feature (for now). This should not be baked into collections, it is a 'reactive' topic.

If resource handling is important, it will appear again later.