typelevel / fs2

Compositional, streaming I/O library for Scala
https://fs2.io
Other
2.37k stars 601 forks source link

Roadmap/workplan request for comment #383

Closed pchiusano closed 9 years ago

pchiusano commented 9 years ago

I'll have some more time to work on scalaz-stream in the coming months and spent some time looking through the various issues and discussions. Apologies for my being rather absent the past few months. Here is what I am thinking about working on, I'd like to hear comments:

Thoughts or comments? Please weigh in!

djspiewak commented 9 years ago

I'll have some more time to work on scalaz-stream in the coming months and spent some time looking through the various issues and discussions.

YAY!

Adopting some version of #351, a Transducer type which may be effectful, generalizes Process1, Tee, Wye, Sink, and Channel. I suspect there's a way to do this without Unapply nonsense. I really don't want Unapply polluting signatures everywhere.

I don't want that either, but I can go into some significant detail as to why Transducer needs Unapply (or rather, a very Unapply-like thing) and why I'm relatively certain that any such functionality will have the same requirement. It basically boils down to a hard-coded recursion limit in scalac itself.

I 100% agree though that this is an important change. Channel is hilariously limiting, and I run into it far far too often.

Additionally, awaits ought to be able to load an entire chunk at a time. This should allow chunkiness and less overhead throughout multistage processes. Combinators like map should probably preserve chunkiness of input.

Please see my comments on #249. Unless we replace Process1 with something that is decidedly different (likely requiring a very different Await formulation; one that allows for back-channel communication and from the receiver), chunking of this sort simply doesn't work in a generalized fashion. We can and REALLY should fix map, but generalizing that to composable combinators such as we have in process1 is not possible in the current Process algebra due to the consumption guarantees.

I think this is really important, and should be considered together with the transducer stuff, but it comes down to some pretty fundamental design decisions in the Process algebra itself, and while we could fix those limitations (at the cost of a significant overhaul of…everything), I'm not sure the requisite tradeoffs are right for us.

Solve resource safety issues once and for all, even in presence of interrupts, via new primitive, bracket, signature TBD. Also reasoning about these things should be super simple (it's not now IMO), and I'd like to write up some clear laws, semantics, and docs. I'm disappointed by how complicated and subtle the issues are with the current formulation

We're close to a solution here. By which I mean, it's implemented on a branch which is blocked by Terminated weirdness (#369), which is in turn blocked by the fact that Channel sucks (#351). I don't know anything about bracket, but we've tried a lot of different options here. I'd be curious to see what sort of signature you're thinking of, since it might be something we've already tried.

Remove scalaz dependency from core

Highly non-trivial for all of the reasons given in #321. Doable, but all of the real guts of scalaz-stream would move out of core along with the scalaz-dependent stuff. None of the interpreters work without Scalaz, including stepAsync, which means that none of the concurrency stuff would be able to live in core anymore.

Publish to sonatype

:+1:

Assuming these changes work out, I'll feel like the library is pretty stable and would like to make the next release a 1.0. This will mainly be focused on testing, stability, and moving stuff out of core that isn't rock solid.

I think this is a relatively good list. There are a few additional scary things though that I'd like to add to consideration:

pchiusano commented 9 years ago

@djspiewak thanks for detailed reply. Agree about moving tcp elsewhere unless we can make it solid before 1.0. I may ping you to discuss details about some of your other remarks when I dig in.

pchlupacek commented 9 years ago

I agree what @djspiewak suggested any ideas on priority list here?

pchiusano commented 9 years ago

For priority, I'd definitely like to start with the first three bullets (generalized transducers, resource safety, and chunked awaits unless @djspiewak convinces me it's a terrible idea :) These all affect the core representation and may affect lots of open issues like #381. Then the other stuff, then a release. I'm not that keen on just doing a random checkpoint release before addressing the bigger stuff.

I want the implementation of this library to be simple, obviously correct, and easy to reason about. We do not have that right now to the extent I'd like. Rather than just making tweaks here and there, I'd like to go back to the drawing board with the core algebra, applying the things we now know. I can put together a gist and send it around for feedback, and y'all can tell me all the ways it is flawed or too limited. :)

pchlupacek commented 9 years ago

@pchiusano fully agree. Lets get to roots and bring something we can build on longer time. I think we know the weaknesses of the Append algebra much better, and have much more solid test coverage etc.

Would be nice to share the gists. I would like also to explore in more detail what we have discussed before as part of append refactoring here (I mean sequence of dependent types).

If we would touch the roots I'd like as well to remove scalaz.Task and scalaz.Trampoline from scalaz, and make our very own version of it in library. We need to fix this async behaviour once for all.

Also can't we get more from haskell? Like the bracket stuff for example?

Let me know with gists, and I am ready to help with refactoring for the second time :-)

Pavel.

djspiewak commented 9 years ago

@pchiusano I think chunked awaits are a fantastic idea and we should absolutely pursue them, they just don't work with our current algebra (particularly with Process1).

pchiusano commented 9 years ago

Okay, cool, I think I'll be digging into this stuff this week and am excited to get started! I will definitely need help from y'all, both with design and implementation. To start things off, I'd like to draft a prototype of a new, simpler core algebra (just some type signatures and docs) and get your feedback on it. I'm hoping this will either give us a lot of the other stuff on the list "for free" or will at least make a nice clean foundation for the other items. And I have some ideas for a new simpler representation / implementation of the core algebra that doesn't require type aligned sequences but can still preserve all the nice performance characteristics of the current representation.

djspiewak commented 9 years ago

@pchiusano I'd definitely be interested in what you've come up with for the algebra. I'm actually very fond of the current algebra, and I tend to see a lot of the weirder corners as consequences both of the implementation and of the fundamental nature of what we're doing (concurrency and preemption are very complicated). That doesn't mean there isn't a better formulation out there though, and I'd be very interested to explore one. My only concern would be that any new formulation isn't going to have the amount of time and thought that we've put into this one, and many of the problems that we've addressed are fundamental to the problem space (e.g. preemption) and very fiddly and tricky to get right, so I'm concerned about tossing that work out.

pchiusano commented 9 years ago

@djspiewak yes, I'm not saying "let's definitely toss out everything" The result of all this might be some conservative changes to what is already there. But since what is already there is IMO not very well-specified, simple, or obviously correct, that would be a disappointing result for me. Let's see how it goes though.

My only concern would be that any new formulation isn't going to have the amount of time and thought that we've put into this one, and many of the problems that we've addressed are fundamental to the problem space (e.g. preemption)

That is a reasonable concern. The way I'm thinking of addressing this is to make sure any new formulation is well-specified with a semantics and laws, which can give us assurance that all corner cases are addressed (and tells us precisely how they can be addressed by an implementation). This will guide the implementation and make it clear what testing we need to do (if we aren't already doing it). The current library is a little too "implementation-defined", and that makes it difficult to make changes of this nature without wondering whether you're breaking things or introducing weird corner cases.

pchlupacek commented 9 years ago

@djspiewak, @pchiusano probably off topic, but was curious if we cannot resolve the preemption issues one for all if we simply make resource first class in the algebra. This just an idea. I just think the we essentially do not care about preemption in normal asynchronous code, but, in resource acquiring code this is an issue.

djspiewak commented 9 years ago

@pchlupacek That would resolve preemption nicely, but the problem is that as long as we have unrestricted effects, we cannot prevent people from doing resource acquisition as part of performing some other effect. And in fact, in my experience, this is by far the most common case for acquisition. In light of this, if we add a separate case in the algebra for resource acquisition, then all we're doing is generating an algebra with two almost equivalent cases for performing arbitrary effects: one with preemption and one without. I would rather just have the one with preemption (which is what we have in master now).

pchiusano commented 9 years ago

I actually want to make resource acquisition a primitive and move away from using onComplete (which I'd like to deprecate) for resource management. I have some ideas for how to do this without duplicating code either (straw man - an "unbracketed" await is a special case of a bracketed await in which your cleanup action is empty). If people want to acquire resources not using that primitive, then by all means they can shoot themselves in the foot and we won't stop them. :) But I still think it is the right thing to include it, and we can use it internally for any functions that allocate resources.

Having a primitive means we know exactly the places where it is safe to do preemption, and places where it is not safe, and can use this information in various places.

djspiewak commented 9 years ago

@pchiusano What is the difference between an unbracketed resource acquisition and an effect evaluation? My point is basically that our current Await basically satisfies that definition (though the way that the fusers are evaluated screws up the tail of the bracket).

djspiewak commented 9 years ago

Re: the roadmap… Finding a way to address #391 would be very nice. Maybe not possible, but nice.

pchiusano commented 9 years ago

Okay, I've got some stuff I'm pretty excited about... will do a writeup today!

pchiusano commented 9 years ago

WIP is here... still have a bunch of stuff to fill in, but figured I'd share: https://gist.github.com/pchiusano/2f743403e40c8edb847d

pchiusano commented 9 years ago

All right, I reworked a few things and made the presentation clearer. Have a look. The large list of laws was unclear, so I opted for giving a canonical (but inefficient) implementation of the algebra which in theory fully specifies all the operations. The real implementation will need to be more complicated but it should follow this pretty simple specification.

@djspiewak @pchlupacek @fthomas @alissapajer @runarorama let me know what you think and/or what questions, comments, or concerns you have. We can just use this thread or gitter to discuss, since comments on the gist won't produce any notifications AFAIK.

pchlupacek commented 9 years ago

@pchiusano excellent. I think it resolves many, many issues, while I don't see that many new areas we may be trapped in. In fact the simplification of channel/sink stuff will add us a lot more flexibility and perhaps introducing new exciting combinators.

I like also clear demarcation between async/not async primitive. First, it would help in correct implementation of resource finalisation, and second we may have pure awaits so much effective than today.

What I am still not sure though if we would be just enough with Fail algebra, perhaps here we may need cause again, and perhaps we may need to extend this for various Kill cases.

I think it deserves the implementation. We would perhaps go in some stuff that we don't foresee, but I would say hence We have now just pure function and Stream instance we shall have a bit more flexibility to fix them.

What I really like we can define different behaviours with Stream instance, that was always problem with pipe, tee and wye, and that polluted code always everywhere. I even can see we can perhaps have a use of Stream[Channel....] and Stream[Process1....] to perhaps correctly define behaviours like pipeIn.

On other hand I think we shall start with implementation that proves we won't run with this in performance issues that was the primary objective for current Append algebra. If this passes, I believe we shall be mostly ok going forward.

pchiusano commented 9 years ago

Cool, will respond in more detail when i get into the office. I woke up this morning realizing I'd completely neglected specifying how early termination is handled. I think it is fixable, basically NF will include info about what finalizers are in scope and active. A valid interpreter will make sure to run these appropriately. On Fri, May 29, 2015 at 1:47 AM Pavel Chlupacek notifications@github.com wrote:

@pchiusano https://github.com/pchiusano excellent. I think it resolves many, many issues, while I don't see that many new areas we may be trapped in. In fact the simplification of channel/sink stuff will add us a lot more flexibility and perhaps introducing new exciting combinators.

I like also clear demarcation between async/not async primitive. First, it would help in correct implementation of resource finalisation, and second we may have pure awaits so much effective than today.

What I am still not sure though if we would be just enough with Fail algebra, perhaps here we may need cause again, and perhaps we may need to extend this for various Kill cases.

I think it deserves the implementation. We would perhaps go in some stuff that we don't foresee, but I would say hence We have now just pure function and Stream instance we shall have a bit more flexibility to fix them.

What I really like we can define different behaviours with Stream instance, that was always problem with pipe, tee and wye, and that polluted code always everywhere. I even can see we can perhaps have a use of Stream[Channel....] and Stream[Process1....] to perhaps correctly define behaviours like pipeIn.

On other hand I think we shall start with implementation that proves we won't run with this in performance issues that was the primary objective for current Append algebra. If this passes, I believe we shall be mostly ok going forward.

— Reply to this email directly or view it on GitHub https://github.com/scalaz/scalaz-stream/issues/383#issuecomment-106697191 .

pchiusano commented 9 years ago

So, after playing with this some more, it looks like the specification of bracket really needs a lot more work. I'd like to give it more thought, but need to take a break from thinking about it. I'll ruminate over the weekend. I'd like to make the spec precise before getting into implementation. I feel pretty confident that we can figure out how to make the implementation efficient.

djspiewak commented 9 years ago

@pchiusano Sorry for the delay in responding. Here are my thoughts (update: looks like you already noticed some of the bracket issues; my comments are left in to add my perspective).

pchiusano commented 9 years ago

Okay, here's an updated version: https://gist.github.com/pchiusano/7834d0725c1ad4332c86#file-streams-scala-L68 /cc @djspiewak @runarorama

@pchlupacek - FYI, @djspiewak was in town and we met up with @runarorama for lunch and whiteboarding on scalaz-stream. Much discussion ensued. Here's a summary of some stuff we went over:

The new gist specifies bracket by adding a cleanup: Vector[F[Unit]] available at each 'step' of the stream. bracket has the obvious meaning of pushing the cleanup action for the duration of the inner stream, then running the finalizer at the end of the inner stream. At any particular step of the stream, we have the list of finalizers if we want to stop listening to the stream, or we can keep going.

Assuming I implemented this part correctly, we also get a guarantee that the cleanup at a given step will never overlap with the Await effect. So, to interrupt a process, we can always run the current cleanup and let the currently running Await finish, and be assured that we run all finalizers exactly once.

The availableAsync, force, and forceRace functions need more work. Right now they are specified to just do things totally sequentially. I like the API though. I think we will need a further constraint on F for those. @djspiewak had some ideas for an Async typeclass.

One kind of interesting observation - if you get rid of forceRace, you can do only deterministic parallelism. That is, you can make requests in parallel, but can't make a different decision based on what results come back first. This is somewhere between tee and wye - wye allows parallelism, but also nondeterminism, since you can do something different if one branch or the other arrives first.

djspiewak commented 9 years ago

@pchiusano I agree that bracket + append makes everything free and non-normal, which is undesirable. However, your algebra is unsound in the presence of preemption, which has always and will always be the hardest thing to solve about finalizers. As discussed, separating the finalizer from the effect (as you have done) makes it impossible to deterministically determine a preemption handler in the event of a preempted action.

I will detail more once I'm not typing on my phone (I think tonight).

djspiewak commented 9 years ago

Deterministically determine and preemptively preempt the preemption. I seem to get very redundant when using mobile devices…

pchiusano commented 9 years ago

Ha. I am not sure what you are saying so I'll wait for you to elaborate. :) There is no rush. I may not have time this weekend to look at this, and I will be off the grid most of Monday and Tuesday.

pchiusano commented 9 years ago

I figured out how to do availableAsync nicely! It is not hard at all. The assumption that we never interrupt a running F effect greatly simplifies the code. Rather than interrupt the running effect, we let it finish the current step, then just don't bother running subsequent steps! No atomic booleans or anything tricky at all. The only code that deals with concurrency is the implementation of Async:

trait Async[F[_]] extends Monad[F] {
  type Queue[A]
  def queue[A]: F[Queue[A]]
  def dequeue[A](q: Queue[A]): F[A]
  def enqueue[A](q: Queue[A])(a: F[A]): F[Unit]
  def enqueueFree[A](q: Queue[A])(a: Free[F,A]): F[Unit]
  // `race`, etc, can be defined here too
}

... simple to implement using an actor.

Anyway, I was getting excited about having such a short simple reference implementation, but it's not correct I don't think. I was thinking about this case -

for {
  a <- p1.await
  b <- p2.await
  out <- b.head ++ b.tail
} yield out

Here, p1 is passing out of scope but we never call its finalizers. I think I might know how to fix that, just by appending the cleanup of p1 to the result of p1.await. But my brain is melting at the moment so I'm going to have a fresh look later. :)

pchlupacek commented 9 years ago

@pchiusano @djspiewak

I am completely fine on not interrupting the task, but having it complete (eventually) and THEN run finalizers. I think what I have seen in your gist, the bracket signature is completely fine for that. In fact I think we had that sort of behaviour now, with subtle difference that we step inside the task and trying to be as nice to interruption as we can. I am completely fine to move this detail off the Task/Free to process, where imho should it be anyhow.

Additionally however, I would like to consider if, at such interruption of running tasks we shouldn't have a way for upstream (callback) to get notified that task has been interrupted. This is perfectly beneficials to queues, where that prevents the dead tasks to be filled with items to be processed w/o re-queueing and thus changing the order.

I believe this behaviour can be baked in Async perhaps?

I am also not sure on certain implementation details in gist (i.e. in Emit case (#129) releasing resource before the x gets emitted. But perhaps that is to deep and early w/o prototyping more that implementation.

pchiusano commented 9 years ago

These are good concerns to bring up. I have a couple questions about what you mean... But I'd suggest we wait to dive in until I have the spec implementation in a state I think is correct or I learn it has fundamental flaws :). Then it will be more clear what aspects are lacking and we can even write little test cases. On Sat, Jun 20, 2015 at 1:25 AM Pavel Chlupacek notifications@github.com wrote:

@pchiusano https://github.com/pchiusano @djspiewak https://github.com/djspiewak

I think we have to answer the question how to release resource that was acquired by asynchronous code. I think we simply cannot say let it run as this will ultimately cause the resources to be leaked.

I am completely fine on not interrupting the task, but having it complete (eventually) and THEN run finalizers. I think what I have seen in your gist, the bracket signature is completely fine for that. In fact I think we had that sort of behaviour now, with subtle difference that we step inside the task and trying to be as nice to interruption as we can. I am completely fine to move this detail off the Task/Free to process, where imho should it be anyhow.

Additionally however, I would like to consider if, at such interruption of running tasks we shouldn't have a way for upstream (callback) to get notified that task has been interrupted. This is perfectly beneficials to queues, where that prevents the dead tasks to be filled with items to be processed w/o re-queueing and thus changing the order.

I believe this behaviour can be baked in Async perhaps?

I am also not sure on certain implementation details in gist (i.e. in Emit case (#129 https://github.com/scalaz/scalaz-stream/pull/129) releasing resource before the x gets emitted. But perhaps that is to deep and early w/o prototyping more that implementation.

— Reply to this email directly or view it on GitHub https://github.com/scalaz/scalaz-stream/issues/383#issuecomment-113710391 .

pchlupacek commented 9 years ago

@pchiusano sure looking forward for it

djspiewak commented 9 years ago

@pchiusano Consider the following pair of effects:

val lol: Task[LaunchCodes] = Task async { cb =>
  // psych!
}

val almostLol: Task[LaunchCodes] = Task delay { Thread.sleep(50000); acquireCodes() }

def releaseCodes(lc: LaunchCodes): Task[Unit] = ???

val p1 = bracket(lol, emit, releaseCodes)
val p2 = bracket(almostLol, emit, releaseCodes)

When you interrupt, either p1 or p2, you need to make sure that releaseCodes still runs! This is absolutely, 100% vital, and it would not be an exaggeration to say that scalaz-stream really isn't useful without this property. The problem is that your frame algebra doesn't allow this.

In your frame algebra, when you interrupt either p1 or p2, you pop out of the frame and you no longer have the finalizer for the effect which is still running! The problem, fundamentally, is that your frame algebra assumes that you can treat evaluation contexts in a purely stack oriented fashion. You cannot. I know that was the picture I drew on the whiteboard, but the asterisk in that picture is that once an effect starts evaluating, its finalizer must be married to it even if you pop out of the enclosing frame. Your algebra does not allow this.

You'll notice, btw, that my pair of effects above is designed to defeat any "just wait until the effect finishes" schemes, since the lol effect is never going to complete, whereas almostLol waits a long time, but does ultimately grab a resource that needs to be released. You cannot, in general, determine which sort of effect you're looking at (one which completes or one which doesn't), and so you need to marry finalizers in the way that the original Bracket algebraic term does (or something equivalent).

The only code that deals with concurrency is the implementation of Async

The problem with your Async class is not that it can be implemented easily using an actor, but that such an implementation is basically the only possible implementation. No, you don't need an actor per se, but you definitely need something very actor-like. I don't have any objection to actors as a low level tool, but I would rather remain abstracted from them if possible.

The Async signature I proposed enables implementation of the concurrent primitives with safe preemption of effects without forcing a queue-backed implementation or requiring existential state.

pchlupacek commented 9 years ago

@djspiewak can you share pls your async signature, perhaps quick gist? Have you folks considered to make a bracket part of first class algebra? Perhaps AsyncBracket?

pchlupacek commented 9 years ago

@djspiewak @pchiusano I think we don't have the problem with Tasks that never complete. I am fine them to wait forever, although as indicated I would love if upstream process will be either signalled (in best case) or just able to check that such callback/task is dangling.

However as @djspiewak pointed out we must assure the cleanups are ALWAYS run once the resource is acquired, which is by the definition nondet, so we must have this very solidly implemented, and I doubt we will have something viable without having this part of our first clas algebra. I was in fact thinking if we cannot make it part of Task.async or its alternative, because, then yes, we can think of the Task being completely atomic.

pchlupacek commented 9 years ago

Was thinking of something like


Task.async { ((cb:Throwable \/ A) => Unit, onPreempt: A => Unit) =>
     ???
}
djspiewak commented 9 years ago

@pchlupacek Sorry, I forgot to put my Async signatures in here. This is what I was thinking of:

trait Async[F[_]] extends Monad[F] with Catchable[F] with Nondeterminism[F] {
  def fork[A, B, S](fa: F[A])(k: (A, S) => F[B]): F[(F[A \/ B], S => F[Boolean])]
}

Regarding adding preemption determination as a first-class component of Task, I am strongly, strongly opposed since this requires people to define their resource cleanup logic in two places, every time: once in the preemption handler (inside of the Task), and once as a finalizer attached to a process region. I think that having an API which definitionally constrains users to violate DRY is probably… not good.

pchlupacek commented 9 years ago

@djspiewak not sure if I am catching all the semantics of async correctly. Not sure what A,B,S parameters stands for.

reg the async task signature, this will be more-less private, not exposed to user. We can surely implement it that it will be just an internal implementation detail. There is no need to specify it in two places. We may use same trick as we do today with bracket.

The idea is just to leave preemption in task. For example, we can use same syntax on task as we do now with Task.attempt. The idea is simply build stronger Task, that will allow us to react on preemption, if we need to. Is sort of what we to today in fact.

You will still use bracket to specify your cleanup logic and only once. Is only that we won't pollute deeply code with preemption details.

djspiewak commented 9 years ago

not sure if I am catching all the semantics of async correctly. Not sure what A,B,S parameters stands for.

fa is the effect. k is the on-preempt continuation, which is run in the event that the effect completes and it is interrupted. S is a signaling parameter, and does little other than reflect parametricity. The return result is an effect which, when sequenced, starts running the fa effect, immediately producing an effect F[A \/ B] and an interrupt function S => F[Boolean]. The interrupt effect produces true if the interrupt was successful, false otherwise (as there is an inherent CAS in this operation). If the effect is not interrupted (or not successfully interrupted), F[A \/ B] will produce a Left. If the interrupt is successful, the preemption continuation is run and it produces a Right.

reg the async task signature, this will be more-less private, not exposed to user. We can surely implement it that it will be just an internal implementation detail. There is no need to specify it in two places. We may use same trick as we do today with bracket.

It cannot be an internal implementation detail because people are handing us effects (of type Task) that they themselves are constructing. In your proposal, safe resource cleanup in the face of dangling effects (preemption) requires that users implement their Task such that preemption logic is encoded in the effect itself.

You will still use bracket to specify your cleanup logic and only once. Is only that we won't pollute deeply code with preemption details.

Unless your "stronger task" has the ability to add preemption logic to a pre-existing Task, you will still deeply pollute code with preemption details.

In any case, this proposal also requires that Task is preemption aware in some form. I think it would be better to avoid that, and remove all understanding of interrupts or preemption from Task, treating it as a purely atomic unit.

pchlupacek commented 9 years ago

@djspiewak to be honest as user, I can imagine to work with your Async signature better than what I propose to task. I think whole async computation has to be backed by implement actor that just does all what we need. And actor shall be constructed when we evaluate it, and potentially stay there till the computation completes / preempts. You than have the algebra around it that configures its behaviour including preemption. If we would call it Task or Async really is technicality I guess.

In fact do we need Task at all?

djspiewak commented 9 years ago

to be honest as user, I can imagine to work with your Async signature better than what I propose to task. I think whole async computation has to be backed by implement actor that just does all what we need. And actor shall be constructed when we evaluate it, and potentially stay there till the computation completes / preempts. You than have the algebra around it that configures its behaviour including preemption. If we would call it Task or Async really is technicality I guess.

I would rather design at a higher level than actors, if possible. In my experience, they have a tendency to just massively massively muddle up the design and state space. They're a good primitive. I'm glad we have them, and I'm sure a lot of this stuff is going to end up in an actor at the lowest level (just as Task is an actor at the lowest level), but every time we've built something in scalaz-stream that is exclusively implemented in terms of actors, we've gotten it wrong in very subtle ways. Every time. In my experience with the actor model, this experience is far from atypical.

In any case, the Async signature I proposed was more with an eye toward what we need to implement the concurrency primitives at a minimal level, allowing us to abstract completely away from Task. Users could use it, but it was intended more as an effect constraint that we could use to implement asynchronous execution and preemption.

As an example of what this allows, a synchronous implementation of Async is actually possible, as is a FreeC backed implementation. Users could potentially test their asynchronous stream composition synchronously and deterministically, perhaps even using ScalaCheck to validate different interleaving potentials to ensure correctness and absence of race conditions. This is completely impossible now due to the fact that stepAsync (and friends) are explicitly implemented in terms of actors and Task. Implementing in terms of the proposed Async typeclass moves away from this and gives tremendous flexibility back to the users.

In fact do we need Task at all?

Yes, and for two reasons. First, users need a way of constructing atomic effects that are explicitly asynchronous. Task.async is absurdly important, and the library loses most of its usefulness if we don't have it. Second, we need a free monad into which we compile Process in the interpreters. Without such a monad, the run methods would need to be impure, directly evaluating the stream. This cuts out a lot of very very useful patterns and use-cases for the interpreters that are possible right now.

pchiusano commented 9 years ago

Okay I think I might have everything worked out, just need to do a writeup. I feel like the two of you are probably talking past each other somewhat.

On Sun, Jun 21, 2015 at 12:08 PM Daniel Spiewak notifications@github.com wrote:

to be honest as user, I can imagine to work with your Async signature better than what I propose to task. I think whole async computation has to be backed by implement actor that just does all what we need. And actor shall be constructed when we evaluate it, and potentially stay there till the computation completes / preempts. You than have the algebra around it that configures its behaviour including preemption. If we would call it Task or Async really is technicality I guess.

I would rather design at a higher level than actors, if possible. In my experience, they have a tendency to just massively massively muddle up the design and state space. They're a good primitive. I'm glad we have them, and I'm sure a lot of this stuff is going to end up in an actor at the lowest level (just as Task is an actor at the lowest level), but every time we've built something in scalaz-stream that is exclusively implemented in terms of actors, we've gotten it wrong in very subtle ways. Every time. In my experience with the actor model, this experience is far from atypical.

In any case, the Async signature I proposed was more with an eye toward what we need to implement the concurrency primitives at a minimal level, allowing us to abstract completely away from Task. Users could use it, but it was intended more as an effect constraint that we could use to implement asynchronous execution and preemption.

In fact do we need Task at all?

Yes, and for two reasons. First, users need a way of constructing atomic effects that are explicitly asynchronous. Task.async is absurdly important, and the library loses most of its usefulness if we don't have it. Second, we need a free monad into which we compile Process in the interpreters. Without such a monad, the run methods would need to be impure, directly evaluating the stream. This cuts out a lot of very very useful patterns and use-cases for the interpreters that are possible right now.

— Reply to this email directly or view it on GitHub https://github.com/scalaz/scalaz-stream/issues/383#issuecomment-113918705 .

pchiusano commented 9 years ago

@djspiewak @pchlupacek

Ok, updated gist here: https://gist.github.com/pchiusano/7834d0725c1ad4332c86

I am not aware of any issues with this implementation or API other than making append and flatMap more efficient. I am amazed at how simple this came out, the API seems to cover all use cases that I can think of, and the implementation is almost trivial. Almost seems too simple. Looking forward to y'all poking holes in it. :)

The implementation deals with running finalizers in exactly one place, the interpreter, runFold. All other combinators are solely in charge of indicating what finalizers are in scope at each point in the stream. It is runFold that detects when finalizers pass out of scope and runs them appropriately. The only correctness condition is that finalizers must pass in scope and out of scope once. I believe the combinators preserve this property.

Notice that we never need to interrupt a running effect, and the only tricky concurrency will be inside the implementation of Async instances. We never interrupt a running task or alter its future. We just run the task, and may decide we aren't interested in subsequent steps. Nothing fancy needed, just like we don't need anything fancy to decide we only want to look at the first 3 elements of a list.

I'm leaving town for a couple days shortly but I'll be able to reply later this week. Looking forward to your comments!

djspiewak commented 9 years ago

@pchiusano The following test will fail with your implementation (assuming a merge defined with similar semantics as the one in master):

var numOpen = 0
class Socket {
  numOpen += 1

  def close() = numOpen -= 1
}

val acquire: Task[Socket] = Task { Thread.sleep(500); new Socket }

def readData(s: Socket): NF[Task, ByteVector] = ???
def closeSocket(s: Socket): Task[Socket] = Task { s.close() }

val p = bracket(acquire)(readData, closeSocket)

(p merge fail(new RuntimeException("yolo"))).run.run

Thread.sleep(500)
numOpen === 0       // nope!

In other words, dangling resources are still not correctly handled. Or rather, they really aren't handled at all. It's not a question of whether or not we allow effects to be interrupted; in the above test, I'm assuming Task is atomic! (which is to say, once a Task starts evaluating, it will complete) We allow streams to be interrupted, and streams contain effects and finalizers. Your algebra does not marry the finalizers to the effects. Specifically, the problem lies in bracket:

  def bracket[F[_],R,A](acquire: F[R])(use: R => NF[F,A], release: R => F[Unit]): NF[F,A] =
    flatMap(eval(acquire)) { r =>
      // notice we don't run the `release` effect here, that is handled
      // automatically by `runFold`, which detects when finalizers pass
      // out of scope
      scope(Finalizers.single(release(r)))(use(r))
    }

Notice that we're binding into the eval, and we don't put the finalizer into scope until after the effect has completed and control has been returned to the stream interpreter. However, the stream interpreter has been interrupted by the fail in my example, meaning that it's going to halt the evaluation of the p stream and never progress into the next bound action. In this case, the "next" bound action is the finalizer which nominally should be around the eval itself!

Note that we can't just redefine the interpreter to somehow get around this, because doing so will break flatMap and may in fact cause non-terminating behavior in general (i.e. defeating the whole purpose of process interruption). The problem is fundamental to the way that finalizers are separated from the effects.

The framing concept is really pretty, and I'd love it if we could make it work, but I really and truly believe that without tying effects together with their finalizers in the process algebra, dangling resources will remain a problem.

On another note, I still really really really don't like the almost-a-queue interface for Async. There really isn't a way to implement your signatures without something very "queueish" (like an actor). While this is a helpful model for concurrency, it certainly is not the minimally defining property that we need from an asynchronous effect. While I'm not 100% convinced that my Async is fully sufficient, I'm probably 80-90% convinced, and my interface has the advantage of not actually forcing the implementation. I'm certainly not closed off toward interfaces which are different from mine; I just want to make sure that we don't over-constrain Async with implementation details.

runarorama commented 9 years ago

Just to chime in, the Light-Weight Monadic Regions paper mentions that region calculi are introduced specifically because:

"Finalizers are not a solution, as there are few guarantees on when they are run, if they are run at all.”

See http://www.haskell.org/pipermail/ haskell-cafe/2007-May/025455.html for a discussion of the problem WRT finalizers in GHC’s IO monad.

I also don’t like the Async interface, for the same reason as Daniel cites. But I also don’t like Daniel’s, for the reason that it’s not usefully implementable without side-effects. I would much rather rely on forkIO and mutable references (or transactional memory) in the underlying IO monad.

— Sent from Mailbox

On Mon, Jun 22, 2015 at 9:40 PM, Daniel Spiewak notifications@github.com wrote:

@pchiusano The following test will fail with your implementation (assuming a merge defined with similar semantics as the one in master):

var numOpen = 0
class Socket {
  numOpen += 1
  def close() = numOpen -= 1
}
val t: Task[Socket] = Task { Thread.sleep(500); new Socket }
def readData(s: Socket): NF[Task, ByteVector] = ???
def closeSocket(s: Socket): Task[Socket] = Task { s.close() }
val p = bracket(t)(readData, closeSocket)
(p merge fail(new RuntimeException("yolo"))).run.run
Thread.sleep(500)
numOpen === 0       // nope!

In other words, dangling resources are still not correctly handled. Or rather, they really aren't handled at all. It's not a question of whether or not we allow effects to be interrupted; in the above test, I'm assuming Task is atomic! (which is to say, once a Task starts evaluating, it will complete) We allow streams to be interrupted, and streams contain effects and finalizers. Your algebra does not marry the finalizers to the effects. Specifically, the problem lies in bracket:

  def bracket[F[_],R,A](acquire: F[R])(use: R => NF[F,A], release: R => F[Unit]): NF[F,A] =
    flatMap(eval(acquire)) { r =>
      // notice we don't run the `release` effect here, that is handled
      // automatically by `runFold`, which detects when finalizers pass
      // out of scope
      scope(Finalizers.single(release(r)))(use(r))
    }

Notice that we're binding into the eval, and we don't put the finalizer into scope until after the effect has completed and control has been returned to the stream interpreter. However, the stream interpreter has been interrupted by the fail in my example, meaning that it's going to halt the evaluation of the p stream and never progress into the next bound action. In this case, the "next" bound action is the finalizer which nominally should be around the eval itself! Note that we can't just redefine the interpreter to somehow get around this, because doing so will break flatMap and may in fact cause non-terminating behavior in general (i.e. defeating the whole purpose of process interruption). The problem is fundamental to the way that finalizers are separated from the effects. The framing concept is really pretty, and I'd love it if we could make it work, but I really and truly believe that without tying effects together with their finalizers in the process algebra, dangling resources will remain a problem.

On another note, I still really really really don't like the almost-a-queue interface for Async. There really isn't a way to implement your signatures without something very "queueish" (like an actor). While this is a helpful model for concurrency, it certainly is not the minimally defining property that we need from an asynchronous effect. While I'm not 100% convinced that my Async is fully sufficient, I'm probably 80-90% convinced, and my interface has the advantage of not actually forcing the implementation. I'm certainly not closed off toward interfaces which are different from mine; I just want to make sure that we don't over-constrain Async with implementation details.

Reply to this email directly or view it on GitHub: https://github.com/scalaz/scalaz-stream/issues/383#issuecomment-114323128

scottcarey commented 9 years ago

Acquiring and releasing a resource has this imperative analogy:

val resource = acquire(x)
try {
  use(resource)
} catch {
  onError(e)
} finally {
  release(resource)
}

I am using this analogy for my thought experiments for creation and cleanup of a resource. In this form, the desired property that the release is always attempted if acquire returns is guaranteed by the JVM due to how the stack and exception handling work -- it even attempts execution when Errors such as OOME happen.

When composing units with the shape above, there are two basic options: nested inside the try, or appended after the finally. These two options are very different in the compositional properties with respect to resources. I do not believe it is possible to have a single monadic syntax for composition of units with resource acquisition and cleanup, and rather these two cases may be best served with different syntax or representations in the algebra -- but that is really just a hunch based on a few thought experiments. My gut says that you could define an algebra where flatMap covers either one, but not both. And users will need both types of composition.

Note that the catch path follows much simpler rules than the finally To see how much simpler, think of how the above is implemented in bytecode: the catch code only exists on the exception handling path , but the finally code is duplicated (or jumped to) on both the normal and exception path. A few proposals in this project have included the need to specify cleanup code twice for the same underlying reason. I believe that it is this aspect that drives the need to fuse the acquisition and cleanup together fundamentally -- otherwise you can't safely and reliably apply it to both paths.

OK back to lurking :)

pchiusano commented 9 years ago

Daniel thanks for that writeup, that makes total sense and I now understand the issue, which I didn't before. :) I have two possible fixes in mind that both seem pretty simple, will write up when back in town.

Runar, fyi the cleanup actions are what we are calling finalizers, we aren't literally talking about using the jvm finalization stuff, which agreed is almost useless. :)

I don't care too much about what the primitive operations are in Async. The Pool abstraction just made the implementation really easy so if that's not primitive I'd like it to be something you can implement for any Async. Note that you can implement the existing Nondeterminism typeclass via an Async (try it!). You can probably implement other stuff also. Anyway we can quibble about Async once we have something working. :) On Tue, Jun 23, 2015 at 12:21 AM Scott Carey notifications@github.com wrote:

Acquiring and releasing a resource has this imperative analogy:

val resource = acquire(x) try { use(resource) } catch { onError(e) } finally { release(resource) }

I am using this analogy for my thought experiments for creation and cleanup of a resource. In this form, the desired property that the release is always attempted if acquire returns is guaranteed by the JVM due to how the stack and exception handling work -- it even attempts execution when Errors such as OOME happen.

When composing units with the shape above, there are two basic options: nested inside the try, or appended after the finally. These two options are very different in the compositional properties with respect to resources. I do not believe it is possible to have a single monadic syntax for composition of units with resource acquisition and cleanup, and rather these two cases may be best served with different syntax or representations in the algebra -- but that is really just a hunch based on a few thought experiments. My gut says that you could define an algebra where flatMap covers either one, but not both. And users will need both types of composition.

Note that the catch path follows much simpler rules than the finally To see how much simpler, think of how the above is implemented in bytecode: the catch code only exists on the exception handling path , but the finally code is duplicated (or jumped to) on both the normal and exception path. A few proposals in this project have included the need to specify cleanup code twice for the same underlying reason. I believe that it is this aspect that drives the need to fuse the acquisition and cleanup together fundamentally -- otherwise you can't safely and reliably apply it to both paths.

OK back to lurking :)

— Reply to this email directly or view it on GitHub https://github.com/scalaz/scalaz-stream/issues/383#issuecomment-114350468 .

runarorama commented 9 years ago

Runar, fyi the cleanup actions are what we are calling finalizers, we aren't literally talking about using the jvm finalization stuff, which agreed is almost useless. :)

Right, we're talking about resource finalizers. I'm not at all talking about object deallocation in the JVM. Nevertheless, this is a problem in the shape of garbage collection and "cleanup actions" are exactly analogous to finalizers on JVM objects and they're janky for the same reason.

pchiusano commented 9 years ago

Ok gotcha I think. On Tue, Jun 23, 2015 at 10:29 AM Rúnar notifications@github.com wrote:

Runar, fyi the cleanup actions are what we are calling finalizers, we aren't literally talking about using the jvm finalization stuff, which agreed is almost useless. :)

Right, we're talking about resource finalizers. I'm not at all talking about object deallocation in the JVM. Nevertheless, this is a problem in the shape of garbage collection and "cleanup actions" are exactly analogous to finalizers on JVM objects and they're janky for the same reason.

— Reply to this email directly or view it on GitHub https://github.com/scalaz/scalaz-stream/issues/383#issuecomment-114524785 .

djspiewak commented 9 years ago

@runarorama

I also don’t like the Async interface, for the same reason as Daniel cites. But I also don’t like Daniel’s, for the reason that it’s not usefully implementable without side-effects. I would much rather rely on forkIO and mutable references (or transactional memory) in the underlying IO monad.

My interface is in fact usefully implementable without side effects. If you squint at it a bit, you'll see that it's very little more than a fancy cojoin. This is extremely intuitive, in fact! Monads definite sequentialism; concurrency is definitionally asequential, which implies that the dual of monads might have some nice tricks that we can use. The only part about it that's truly weird is the preempt function (S => F[Boolean]). I dislike this function because the Boolean is exactly as ugly as it appears, but I don't see a way around it.

We can rearrange the function a bit by requiring a primitive compare-and-swap from F, but that feels over-constraining and too specific.

I most certainly do not like forkIO, and for a whole host of reasons. First off, forkIO is very low level, and does not provide the functionality we need to make asynchrony work! I actually tried to start with forkIO, and I couldn't do it. To put it more plainly, the current scalaz-stream could not be implemented purely in terms of forkIO, and I have little doubt that the new algebra will be no different in this regard. Second, forkIO imposes an indirection around what is essentially a cojoin, and that indirection is ThreadId. I have no idea why this was considered to be a good idea when it was written, but it's not a good idea and it never has been. (to be more than a little blunt) This kind of crazy indirection works, but it requires effects to be useful and it completely gives up on any sort of type-level constraint on the relations between parallel computations. My fork function doesn't do enormously better in this regard (see: S => F[Boolean]), but some relation is still preserved in the A and B parameters.

My final problem with forkIO is ironically the same objection you raise about my fork: it's effectful. The problem is that forkIO requires a ton of effectful processing outside of the forkIO function in order to do anything useful, whereas my fork keeps things self-contained and thus can be usefully implemented without effects (e.g. you could very easily implement it using a free algebra that is at least as powerful as State). As you said, forkIO does lean on MVar or STM in the IO monad, but IMO that's a bug and not a feature. forkIO is basically saying "well, we can already do everything in IO, so let's just punt on our concurrency primitives".

I don't care too much about what the primitive operations are in Async. The Pool abstraction just made the implementation really easy so if that's not primitive I'd like it to be something you can implement for any Async. Note that you can implement the existing Nondeterminism typeclass via an Async (try it!). You can probably implement other stuff also. Anyway we can quibble about Async once we have something working. :)

I agree that we can quibble about Async later. :-D I generally consider Nondeterminism a sufficient starting point, and I was just trying to fill in the gaps in its interface. You obviously can't implement Nondeterminism in terms of my proposed signature, and this was very definitively an anti-goal. If Nondeterminism were implementable in terms of my Async, then why bother with Nondeterminism at all?

pchiusano commented 9 years ago

Fyi, I don't like Nondeterminism and consider it sort of a failed experiment. There was a time long ago (in a galaxy far far away) where we tried to write various functions like wye in terms of just Nondeterminism rather than requiring Task. I remember it being pretty much impossible to make stack-safe or efficient. Having some sort of Pool + forking abstraction makes stack safety very easy.

I think almost done with the changes to fix the issue that @djspiewak brought up, will post shortly.

pchiusano commented 9 years ago

All right, updated gist here: https://gist.github.com/pchiusano/7834d0725c1ad4332c86

I believe this fixes the issue @djspiewak brought up. The issue is that p.awaitAsync or p.availableAsync kicks off asynchronous evaluation of the next step of p (not literally as a side effect, of course). This leads to difficulties if p is at a step which acquires resources. So if we have the pattern:

for {
  blah <- bracket(acquire)(use, release).awaitAsync
  ... blah never referenced again!

Then after the bind of blah, we have an asynchronous task that will at some point run acquire. Thus we need to be assured that regardless of whether we look at blah again, the release finalizer is introduced in scope starting at blah. Daniel's example of bracket(acquire)(use, release) merge fail(err) is an example of this (since merge is just implemented using awaitAsync).

The fix introduces a new constructor, Acquire, which indicates an Await that is doing resource acquisition. Notice that we obtain the resource and the finalizer for that resource in a single, uninterruptible step.

This info just gets passed along and then used in a single place, the implementation of availableAsync, which does something slightly clever to make sure that the finalizer is introduced at the right place, even though resource acquisition is kicked off asynchronously. The implementation is still pretty simple (125 LOC), and I am out of ideas for how to simplify further while still allowing for asynchronous resource acquisition and all the other features we want.

(A solution I considered but rejected - just making resource acquisition synchronous. So if you bracket(acquire)(use,release).awaitAsync, the acquire would actually be run synchronously. I think this is a little easier to get correct but makes the library a lot less useful.)

All right, once again I think that I am done and that everything Should Work, so let me know if you can poke holes in it! Let's sideline any quibbling about the formulation of Async for now. :)

djspiewak commented 9 years ago

@pchiusano So I've spent some time thinking about this, and I'd like to see an implementation of mergeHaltBoth. I believe that what you're doing does work (though it relies on an implied implementation detail of your Async, which I don't like as much), but I'm a little concerned about the lack of ability to "unpack" the step algebra (returned from the available functions) without having to worry about effect cases. So in short, mergeHaltBoth would be very informative!