typelevel / fs2

Compositional, streaming I/O library for Scala
https://fs2.io
Other
2.37k stars 602 forks source link

Append is not associative with finalizers #333

Closed djspiewak closed 9 years ago

djspiewak commented 9 years ago
  property("associativity of append with finalizers") = secure {
    @volatile
    var finalized = false

    val p = emit(()) ++ (emit(()) onComplete eval_(Task delay { finalized = true }))
    // val p = (emit(()) ++ emit(())) onComplete eval_(Task delay { finalized = true })

    p.kill.run.run
    finalized
  }

In its current configuration, the above test fails. If I swap the commented lines, which is to say swap the associativity on the ++ operator, the test will pass. In other words, binding a finalizer to a non-halt process within an append breaks the guarantees about finalizer evaluation.

Even more seriously, the fact that there is an observable difference between the two p variants means that ++ is not associative. As an extension of this, flatMap is also non-associative. It was previously known that flatMap was non-associative in the presence of njoin (#274). This test case shows that njoin is unnecessary to demonstrate failure of associativity with monadic joining.

Needless to say, this is an undesirable state of affairs. It makes it difficult to reason about finalizers in general, and can mean that naively extracting a process definition into a variable (thus altering the associativity) can change the semantics. Even worse, a given process definition will have different semantics depending on the associativity with which it is composed into another process!

I believe all of these issues come down to a single design flaw: append and onComplete are implemented using the same mechanism (onHalt), which is overly general. We need to split up append from finalization. Doing this correctly should resolve the associativity of ++ issue described here (and by extension, flatMap). Resolving the njoin associativity issues would require we take the further step of restricting finalizer processes to be of type Process[F, Nothing] (in fact, we probably need to do this even for ++ associativity, but I haven't thought that through).

So the tl;dr here is that the monadic laws do not hold for Process in the presence of finalizers. Or rather, they do not hold in the presence of finalizers for a "strict" definition of process equality. The above assumes we define two processes to be equal if they a) produce the same elements in order, and b) perform the same effects in order. If we drop the second requirement, then magically the monadic laws (and ++ associativity!) holds once more. However, I would argue that this weaker definition of process equality is somewhat useless, given the primary use-cases for effectful streams.

At the end of the day, we want to use Process as a way of reasonably managing effects, and it is difficult to reason about a "monad" for which the monadic laws do not always hold.

pchlupacek commented 9 years ago

@djspiewak I am not sure, if ++ has to be associative. You say append if previous completes normally, however It did not complete normally because you killed that process with kill. I agree in the displayed scenario, it sounds as a broken law, but imho it is not. I think that commented and uncommented programs are really different, even though you really think of them like same associative versions. With emit it perhaps sounds like what you would like to have, but not with Process[F,O], where perhaps you really don't want to run any effects after you kill the program.

I know we have raised that when doing append refactoring, and we decided that ++ is not associative. However let's have this open and discuss it.

djspiewak commented 9 years ago

@pchlupacek I agree that you don't want to run any non-finalizer effects when you kill a process, but the finalizers are carrying certain guarantees. Above and beyond that though, this demonstrates an observable difference between the two p processes, stemming solely from the association of ++! As I mentioned in the OP, this has serious consequences for the semantics of processes which may be passed in externally, then innocently appended to things (effectively whacking their finalizers). As a side-bar, note that I can generate this same behavior without using kill by throwing a Halt(Kill) ++ at the head of both processes.

Your argument that this is not law-breaking really stems from the weakened equality that I mention in the OP. You're effectively defining the equality of two processes based on their output values, rather than on the effects that they perform. As I argue in the OP, I do not consider that to be a useful definition of equality for Process, especially where the laws are concerned, simply because the primary practical function of Process is to manage effects.

djspiewak commented 9 years ago

@pchlupacek As an aside, you mentioned on another PR that you feel the following idiom should be resource safe:

eval(allocate) flatMap { r => p onComplete cleanup(r) }

The above is unsound precisely because of the non-associativity of append. So it's not just a law-related purity question. Append not behaving in a reasonable way with respect to finalizers has far reaching and very surprising consequences.

jedws commented 9 years ago

As a user of the library I completely expect the above to be sound, and I expect ++ to be associative.

pchlupacek commented 9 years ago

@djspiewak @jedws I thought more about this, and I think you are right. We have to fix this.

pchiusano commented 9 years ago

I agree that ++ should be associative, regardless of presence of finalizers. I'm not sure how to fix this, though. On Mon, Mar 16, 2015 at 2:12 AM Pavel Chlupacek notifications@github.com wrote:

@djspiewak https://github.com/djspiewak @jedws https://github.com/jedws I thought more about this, and I think you are right. We have to fix this.

— Reply to this email directly or view it on GitHub https://github.com/scalaz/scalaz-stream/issues/333#issuecomment-81461673 .

djspiewak commented 9 years ago

I agree that ++ should be associative, regardless of presence of finalizers. I'm not sure how to fix this, though.

We need to update the algebra. I'm pretty sure we need to separate appends and finalizers, with each becoming a slightly weaker version of what we have now. Appends will no longer be functions from cause to process, while finalizers will require processes of type Process[F, Nothing]. This is strictly weaker than what we have now (where you can emit values in finalizers), but it allows us to fix not only the ++ associativity issue but also the finalizer guarantees provided by njoin (see #274). If we really, really feel that having finalizers that are capable of emitting is an important feature, we can preserve the existing Append as a third case in the append sub-algebra, exposed exclusively via onHalt. The caveat there would be that any finalizers created with onHalt would not benefit from associativity.

pchlupacek commented 9 years ago

Well perhaps something like this will work ?


 onHalt {
      case End => p2
      case cause => p2.injectCause(cause) 
    }

may work?

djspiewak commented 9 years ago

@pchlupacek Not sure I understand. You're proposing that as the implementation for which higher level combinator?

pchlupacek commented 9 years ago

ah sorry now append is

final def append[F2[x] >: F[x], O2 >: O](p2: => Process[F2, O2]): Process[F2, O2] = {
    onHalt {
      case End => p2
      case cause => Halt(cause)
   }
}

effectively throwing away if process terminates with anything else than End. That causes the non-associativity, actually. What we need in case of non-end process is to run finalisers, so instead of throwing it away, we need to run them passing through the Cause, so the idea is to use similar technique we do in kill, that mean propagate exception down the program, i.e. instead change append definition to


final def append[F2[x] >: F[x], O2 >: O](p2: => Process[F2, O2]): Process[F2, O2] = {
 onHalt {
      case End => p2
      case cause => p2.injectCause(cause).drain.causedBy(cause)
    }
}
djspiewak commented 9 years ago

@pchlupacek That…is extremely clever. I think that might work. Or at least, I can't think of any cases where that doesn't work, and it's certainly less invasive that changing the algebra. I'll play around with it tomorrow, or you can if you prefer. It seems like a good change in any case.

pchlupacek commented 9 years ago

@djspiewak thanks :-) Please feel free experiment. I would do but atm I need to finish other project, so would get on that earliest over weekend. If this would work, we have to revisit onKill, onComplete et all and make similar change

djspiewak commented 9 years ago

@pchlupacek So actually this doesn't work. The reason is actually somewhat recursive. The problem is that draining a process doesn't actually null out the appends, and injecting the cause doesn't either when we make this change. So in other words, infinite appends (such as repeat) just run forever.

I'm actually mildly concerned that this might be a more fundamental problem with the whole concept of appended finalizers. Haven't fully thought it through yet.

djspiewak commented 9 years ago

Just as an update, I've spent some time brainstorming this with @runarorama and @eed3si9n. Eugene came up with a very nice idea for essentially redesigning finalizers with respect to await and append. This will also make some of the runAsync semantics a bit cleaner. Not something for 0.7, but hopefully soon thereafter.

I'll write up more details when I get the time.

pchlupacek commented 9 years ago

@djspiewak yeah. I think we may need to update algebra anyhow for couple reasons than that. I'll try to see if I cannot do at least something with current signatures. Some of the combinators are very "special" beasts, and as such maybe just a very small change can improve things a bit.

If you have some concrete proposed signatures I'll be really interested to see them.

djspiewak commented 9 years ago

@pchlupacek I have a longer explanation coming, but just so you can start thinking of things, this is concretely what I'm proposing:

It will still be possible to create onComplete-like things via onHalt, but we will loosen the guarantees associated with their run (basically, onHalt is guaranteed to run when the process to which it is attached completes; thus, we will explicitly exclude associativity of append over onHalt). Finalization with guaranteed resource-safety will be tied to Await.

Again, I have a long explanation coming for why this is and why I think it's the only feasible approach (in particular, I'm convinced that onComplete in its currently "detached" form cannot ever work). The above is simply a quick summary.

djspiewak commented 9 years ago

I promised a longer explanation of the proposed changes to Append, Await and the semantics of finalizers in general. Here it is! Consider the following snippet:

lazy val things: Process[Task, Int] = emit(0) ++ (things map (1 +) onComplete foo)
things.kill         // is this finite?

We have two "invariants" that are in direct conflict here:

The reason for the conflict is that we have generated an infinite chain of onComplete(s)! I didn't actually do it here, but obviously it would be trivial to create such an infinite chain with the added property that every single onComplete was unique (it would merely require a flatMap). Now obviously we don't want to outlaw infinite processes or flatMap, so we have a serious problem! Does kill produce an infinite process in this case, or do we admit that we simply cannot guarantee exactly once (or even at-least once) semantics for finalizers?

What I would propose to you is that we have attempted to cram too much into onHalt. The function as it stands allows users to simply airdrop "finalizers" anywhere they please in the stream, carrying the naive assumption that we will always be able to root them out and preserve proper sequencing and evaluation guarantees no matter what the process structure around them. We went to great lengths in the stepAsync rewrite to achieve sane semantics for await-nested onComplete, and we still have weird glitches!

My proposal is that we cannot weaken the finitary invariant of kill, and so we must weaken the (never really achieved) exactly once invariant for onHalt. Of course, weakening this invariant makes onHalt somewhat useless (or at the very least, unreliable) as a finalizer mechanism, which means that we need to step forward with a new mechanism which will be reliable.

Eugene Yokuta very astutely pointed out that finalizers and Await are intrinsically linked. You acquire a resource with Await, and you free it with a finalizer. Given this innate relation, why are we trying to split them up? My proposal is that we change the Await constructor to the following definition:

case class Await[+F[_], A, +O](
    req: F[A],
    rcv: (EarlyCause \/ A) => Trampoline[Process[F, O]],
    finalize: A => Trampoline[Process[F, Nothing]])

(in largely unrelated news, I suspect we can simply remove the Trampoline altogether here, since Append now handles that aspect for us)

Thus, a finalizer is a function A => Process[F, Nothing]. It is invoked exactly once if and only if req completes (yay, parametricity). We further guarantee (but cannot constrain in the types) that this finalizer is only invoked upon the completion of the process produced by rcv, if in fact rcv is invoked (it may not be, in the case of an error or interrupt). We can make this hard-and-fast guarantee about the semantics of finalize precisely because we have lifted it into the algebra as a first class concept.

Making this change allows us to be more precise about the semantics of stepAsync, fix the resource leaks often naively generated by onComplete placement, and even fix the long-standing finalizer sequencing oddities in njoin. What this change does not do is fix onComplete. In fact, onComplete would remain exactly the way it is today! My argument is that we cannot make any stronger guarantees about onComplete than we already do, and so we should move onComplete into a convenient little category of combinators over which ++ does not distribute, thus "solving" the associativity problem by simply ruling it out.

In fact, I would advocate removing onComplete entirely to make it extremely clear that this is not the intended mechanism for resource disposal. We can retain onHalt, since the functionality is useful under somewhat restricted cases (even beyond ++) and anyone who understands Cause probably also understands that they're in somewhat dangerous waters with the combinator. At the very least, onHalt doesn't feel like a "first class" combinator, whereas onComplete does, and it is precisely this API design that invites its misuse (misuse which is, unfortunately, without alternative in the present algebra).

Thoughts?

pchlupacek commented 9 years ago

@djspiewak Thanks for explanation. In fact if you look over when we disccussed append algebra this was one of the experimental path we were trying. I agree with Eugene's observation, and in fact to me

 A => Trampoline[Process[F, Nothing]]

Is very simillar like

 rcv(Interrupted(A)) 

Though as you say the explicit finalizer is what makes things much easier to reason about.

However essentially tightening this to await, we essentially doing sort of different algebra for try {} catch which just in our syntax will be await.onInterrupt or similar.

I would like still to explore if can't do better and leave onComplete as it is and strengthen it a bit. Additionally, I am just thinking loudly, but perhaps the onKill can receive now Option[A] to just allow for termination after interruption returned a.

I agree onComplete is too generic and perhaps even would vote for its deprecation, if we can't do any better. One remind tho, onComplete is as well specific that its parameter is augmented with asFinializer, which was specifically invented for onComplete, actually.

eed3si9n commented 9 years ago

I only made what @djspiewak called "category theory gang sign"

triangle

and claimed that the dual of finalizer is an initializer.

pchlupacek commented 9 years ago

+1

djspiewak commented 9 years ago

@pchlupacek The asFinalizer really just prevents Kill signals from propagating into the parameter and murdering off the effects we were trying to run. It doesn't prevent that Kill signal from nulling out enclosing processes, which is what the example in the OP demonstrates.

I'm all in favor of strengthening onHalt if we can, but I really don't think it's possible. Free-floating finalizers are just never going to work properly.

pchlupacek commented 9 years ago

@djspiewak you perhaps right. The one important observation is that await actually is the only place where A can be safely typed. so if A will be resource, is only place where we can safely got A that we really want :-) So perhaps you right we don't have better solution then attach interrupt finalizer to an await

pchlupacek commented 9 years ago

@djspiewak generally I think we are kind of back to awaitInterruptibly idea....

pchlupacek commented 9 years ago

I think we have here 2 options how to resolve this now:

  1. change signature of async callback and modify its signature so the source of callback can recover if the task was interrupted. This will require I think modification of scalaz.concurrent.Task, but imho is better and more reliable solution than option (2). My proposed async signature will be :

   Task.async[A] { cb: (Throwable \ / A) => Boolean) => 
-----
        if (cb(a)) // task was not interrupted
        else // task was interrupted, recover 
   }

That will in fact guarantee, that once task is interrupted, A must be handled to not leak any resources. If we would implement this, then with very small changes to runAsync we can have desired exactly once behaviour for cleanups.

  1. Introduce change to await :

  Await(req:F[A],rcv: Throwable \/ A => Trampoline[Process[F,O]], cleanup: A => Trampoline[Process[F,Nothing]]) 

as previously discussed.

I am not sure how the cleanup will work in different combinators (pipe, tee, wye) but I hope it is implementable.

Then on the append associativity, I hope we perhaps can do:


case class Step[+F[_], +O](head: EmitOrAwait[F, O], next: Cont[F, O]) extends HaltOrStep[F, O] 

 case class Cont[+F[_], +O](stack: Vector[Option[Cause] => Trampoline[Process[F, O]]] @uncheckedVariance) 

where append will be defined by passing None, whereas repeat, onHalt etc. will stay as they are. Perhaps (and I am not sure of this) this may also made the counter in step redundant.

pchlupacek commented 9 years ago

@djspiewak, @pchiusano let me know what are your thoughts on previous

djspiewak commented 9 years ago

@pchlupacek I'm very much not in favor of changing Task.async. Not only does it force us to bite the bullet and fork off scalaz.concurrent (which is more work in the short term), but it also doesn't really address the problems we're trying to solve. It would implement a form of resource management, yes, but it would be a fundamental change in how resources are managed in scalaz-stream. Users would need to define their resource management logic in multiple places: inside of the acquiring Task (for interrupts) and in some sort of scalaz-stream handler (for non-interrupted completion). This seems undesirable to me.

I'm much more in favor of option 2, which is putting a cleanup handler into Await itself. Conceptually, this makes the most sense to me (it ties finalization and acquisition together in the algebra) and it's very easy to work with in terms of getting the semantics we want out of stepAsync. The fusers are a different matter, but I'm confident that we will be able to define things sanely even in those cases.

Regarding the associativity of append over onComplete, as I have previously argued, this is a completely lost cause, not because of onComplete but because of kill. Your proposal for Step and Cont doesn't really change any of the high-level cases that I've listed; onComplete still doesn't associate fully! However, I would argue that with the addition of a cleanup continuation to the Await case, we don't need finalizers to associate over append. For this reason, once we have cleanup, I would advocate strongly for removing onComplete altogether. We can keep onHalt with the caveat that it (still) doesn't associate over append in the presence of kill, but onComplete is just too deceptive and too easy and also too unnecessary once we have cleanup as part of Await.

pchlupacek commented 9 years ago

@djspiewak thanks. I take your points. Lets start with option (2). I think option (1) still solves the problem more deterministically, but I agree with drawbacks you mention.

Lets see where we get with (2) and how that will have impact on associativity. I am still not giving up that Append associativity with current algebra (unless you have mathematical prove for that :-)), but lets do that after. I also think by implementing the (2) the runAsync will be far much simpler and cleaner (at least have feeling for it) as we can only run rcv on interrupt and cleanup on completion of interrupted task.

pchlupacek commented 9 years ago

@djspiewak I just did a quick experiment on introducing cleanup to await. It went actually surprisingly well, can you try to hook it to runAsync? Branch is here https://github.com/scalaz/scalaz-stream/tree/bug/await-cleanup.

djspiewak commented 9 years ago

@pchlupacek I think the biggest trick is actually going to be getting the syntax side of things to look sane with the Process.await function. Hooking into stepAsync isn't going to be that terrible, but it's certainly going to be more than just adding some underscores everywhere. :-)

I'm currently working on seeing if I can patch the fusers into working with the current algebra. Obviously, this (the algebraic change) is the long-term solution and I will be working on it next, but I want to exhaust the short-term alternatives first, especially since the algebraic change alters binary compatibility.

pchlupacek commented 9 years ago

@djspiewak I don't think we need something really special on syntax. We simply sort of reverting to original signature of await, re-introducing cleanup, this time with A as input paremeter hence it is consulted only in case the task gets interrupted. I have doubts we have to extend anything more complicated above that.

I also don't think migrating to this will be a horrible story, as I didn't see that many users using Await directly. Most of the code uses await, awaitOr that has source-compatible signature. Good prove for this was that I didn't have to touch single Spec :-)

Of course if you can solve with current algebra will be excellent. Specifically it will be a nice trick to solve that recursion aspect of tee and pipe, I think other combinators are doable (wye). The problem of tee and pipe is you need somehow get state of tee and pipe off the rcv to cont, once rcv completes which I am not sure is doable.

djspiewak commented 9 years ago

@pchlupacek The problem for us is that we depend on a lot of other projects in the scala ecosystem which in turn depend on scalaz-stream. If we do something binary-incompatible without making a release yet, we have to go through and update all the various dependencies to use a self-published snapshot version. We've done that before, and it's not the end of the world, but it's certainly less convenient than remaining binary-compatible, pushing a self-published snapshot and masking the transitive dependency.

My point with the await syntax is that we need to make it easy and syntactically convenient to create an await with a paired finalizer, and similarly we need to make it hard and syntactically inconvenient to do the same thing via onHalt. The principle at work here is that it should be easiest to do things right, and harder (or impossible) to do things wrong. This is also why I want to remove onComplete.

djspiewak commented 9 years ago

@pchlupacek So staring at the following construction, I just realized that the fusers are a big problem for Await with cleanup, and basically for the same reason that they're a problem now:

awt.extend(p => (p +: cont) pipe nextP1)

So we flip the continuation inside of the Await, effectively converting every Await into a span that runs to the end of the world. This is a problem now because it changes the semantics of finalizers outside of the Await (by moving them inside). This will be a problem under the new scheme because it means the finalizer won't run until the end of the world, hanging onto resources far, far too long. For obvious reasons, we can't have that.

So either way, we need to fix the fusers. I agree that the fundamental problem is lifting the state out of the tail of the await and into the continuation, and I don't see an obvious answer to that problem just yet, but I'm working on it.

pchlupacek commented 9 years ago

@djspiewak hm, I am not sure if I understand well. With introducing cleanup, we only have to make sure that it travels with await where it was defined. We don't have to combine it with any other cleanup/process code.

I believe, then, you can safely cleanup process on interrupt by rcv(Kill), instead by running outer and inner cleanup. That shall make the runAsync a lot easier, hence you only need to assure that after interruption you get callback from task, that will convert to rcv(Kill) and then if the second completion will be received that will produce an (a) to rcv(a).

As a consequence, I think it does not matter any more if the cont will be swapped inside, hence you always in await call rcv() with whatever the result of task was.

I am perhaps missing something?

Ah, yes on pipe, tee et all, let's ignore cleanup on Process1/Tee/Wye, as we don't need to clean any resources in P1/Tee/Wye

Also can you perhaps elaborate on hanging resources with example? I mean under situation when await is not interrupted I don't see any difference between

  awt.extend(p => p +: cont) 
  //and
  awt.extend(identity) +: cont

and when it is interrupted, I don't see a difference between

  Step(Await(req,rcv,cln), cont) 
  r: (Throwable \/ A) => {rcv(r).run +: cont}.apply(left(Kill))

  // and
    Step(Await(req,rcv,cln), cont) 
   rcv(left(Kill)).run +: cont
pchlupacek commented 9 years ago

@djspiewak for the syntax issue we had once fby defined as specific case only attaching to the completion of last await. However problem with anything not tight to await will be that we won't get any type parameter.

As such I wouldn't like to complicate things a lot, with syntax but just to make it as simple as possible so only place will be to touch awaitOr

// cleanup has default value
awaitOr(REQ)(rcv,cleanup = (a) => Trampoline.done(halt))

//introduce
awaitInterruptibly(REQ)(cleanup)(onFailure)(rcv)

I think it would be hard to prevent onHalt contain cleanup code that may depend on closing over a. But let's start with something, and perhaps we can bring something on the table on the way how we implement it?

djspiewak commented 9 years ago

@pchlupacek Consider the following:

await(req, cln) { a => rcv(a) } ++ cont
// vs
await(req, cln) { a => rcv(a) ++ cont }

In the proposed algebra, cln is invoked upon the completion of rcv or upon interrupt. Let's just consider the completion (uninterrupted) case. In the top example, cln runs correctly after rcv finishes, and then cont runs afterward. Thus, order of evaluation: rcv > cln > cont. In the second example, cont is inside of the await, and so cln doesn't run until after cont does! Thus, order of evaluation: rcv > cont > cln.

This only begins to sound bad once you realize that cont could take a very, very long time to evaluate. It could even be unbounded! Thus, cln might not actually run until the very end of the world, meaning that any resources which are acquired in req will be held long, long after they are no longer required, effectively engineering a resource leak.

Obviously, this isn't tenable. We can't allow a situation where users could transparently and accidentally transmute a sound resource management scheme into one which leaks resources until the end of the world, simply by composing through a fuser. Thus, we need to solve the lifting problem inside of pipe and tee, regardless of whether or not we change the algebra.


I was talking about this issue today with @scottcarey, and he pointed out that the problem is the fusers are breaking the compositional properties of Process. Specifically, the fusers are making the assumption that await(req) { a => rcv(a) } ++ cont is perfectly equivalent to await(req) { a => rcv(a) ++ cont }. This assumption is invalid though, both today and with the proposed algebraic change. So I'm wondering if maybe the real problem here is the strength of Await and the fact that we can have both a body and a continuation.

Perhaps a better solution would be to remove Await entirely and replace it with an Eval case, for which Process.eval would be the constructor. This effectively specializes Await on Emit. It removes the body problem, meaning that the fusers would no longer have any issues whatsoever with state lifting. It dramatically simplifies basically every interpreter we have, and it ultimately doesn't remove any expressiveness since you can always eval . flatMap in place of the current await. In fact, we could technically preserve the Process.await function defined in terms of eval . flatMap.

The problem is that I can't see a way to make resource management work deterministically with this scheme. If there is a way to make it work though, I think this would be a really really nice simplification of the algebra, and one which would resolve a lot of problems and current special-cases.

djspiewak commented 9 years ago

Actually, I think that @pchlupacek's Task.async suggestion might be the only way to make an Eval case work. My objection on multiplicity of finalizers would still stand, but with an Eval case, we're stuck with multiple finalizers regardless of how they're represented.

pchlupacek commented 9 years ago

@djspiewak this is unfortunately the observation I have did. I am still trying to grasp your arguments why we can't do that in current signature // scheme, but I think the Task. async signature is just missing the interruption as first class cause.

I think we can perhaps just include concurrent libraries in scalaz.stream, and keep scalaz._ dependency as it is. We can then perhaps by using implicits make sure that ppl won't need to rewrite the code and will be able to use both Tasks. Not sure on that tho.

pchlupacek commented 9 years ago

@djspiewak I thought about your argument with

await.extend(p => p +: cont) 

where we have issue if cont will be never terminating or long-duration process, the cleanup will never be called, thus holding up the resource possibly forever.

I know you shown, scala await(req, cln) { a => rcv(a) ++ cont } but I think in that case you forcefully doing bad thing, essentially extending the await's rcv indefinitelly (I mean one can write as well scala await(req, cln) { a => rcv(a) ++ cont(a) } and there is no way we can prevent users from doing that. I agree this is probably not that intuitive as it shall be, but hey, we can't solve all the baddies people can do.

However, how about then define awaitOr (or your proposed await with cleanup ) as :


def awaitOr[F[_], A, O](req: F[A])(
    fb: EarlyCause => Process[F, O]
    , cln: A => Process[F,Nothing] = (a:A) => halt
    )(rcv: A => Process[F, O]): Process[F, O] = {
    Await(req, (r: EarlyCause \/ A) => Trampoline.delay(Try(r.fold(ec => fb(ec), a => rcv(a) onComplete cln(a) ))), (a:A) => Trampoline.delay(cln(a)))
  }

Then I think when using

scala awt.extend(p => p +: cont)

will always guarantee the cleanup will be called before cont will get executed in normal operation (where a will be produced, and in Interrupted situation the cleanup will be used to assure resource safety instead invoking the rcv.

Last remark, please note that fuser don't use ++ but +: which is completely different. So althought I understand what you wrote, I think this is not really the problem hence +: shall be same from inside and outside, or at least we must mange it to be :-)

Last idea (if we cannot do any better) would be instead using recursion, to introduce internal var to hold state for pipe, and tee, and introduce go accepting this var, instead of calling pipe and tee directly in recursion .That way we could possibly pass state between rcv and cont safely.

djspiewak commented 9 years ago

@pchlupacek

this is unfortunately the observation I have did. I am still trying to grasp your arguments why we can't do that in current signature // scheme, but I think the Task. async signature is just missing the interruption as first class cause.

We can do it in the current signature, but doing so doesn't solve the fusers! The problem with the fusers is Await. Now, replacing Await with Eval reintroduces the problem of preemption boundaries, and the only solution to that problem (that I can see at present) is your Task.async suggestion. Without replacing Await with Eval though, we still have a natural preemption boundary, meaning that we don't need to change Task.async. The two problems are inextricably linked.

Put another way: the lexical body of Await solves preemption but breaks the fusers, while the lack of body on Eval solves the fusers and breaks preemption. We have a proposed alternative solution for preemption (namely Task.async), but the only alternative solution for the fusers involves mutable state.

I think we can perhaps just include concurrent libraries in scalaz.stream, and keep scalaz._ dependency as it is. We can then perhaps by using implicits make sure that ppl won't need to rewrite the code and will be able to use both Tasks. Not sure on that tho.

Worth exploring, but extremely tricky in the presence of covariance. We'll have to see.

I know you shown, scala await(req, cln) { a => rcv(a) ++ cont } but I think in that case you forcefully doing bad thing, essentially extending the await's rcv indefinitelly (I mean one can write as well scala await(req, cln) { a => rcv(a) ++ cont(a) } and there is no way we can prevent users from doing that. I agree this is probably not that intuitive as it shall be, but hey, we can't solve all the baddies people can do.

The fusers are doing this. It's not something random and bad that external users can do (but shouldn't); it's something that we depend on.

Also, we can prevent people from doing this by removing Await and introducing an Eval case. As I detailed above, that change to the algebra completely eliminates this whole discussion, because there is no longer a "body" to the Await.

will always guarantee the cleanup will be called before cont will get executed in normal operation (where a will be produced, and in Interrupted situation the cleanup will be used to assure resource safety instead invoking the rcv.

We cannot make this guarantee. How do you tell that p is completed but cont hasn't been started? Remember that p may consist of a large number of Append nodes, perhaps even automatically constructed from Cont in the same fashion as what the fusers are doing. There is no visible boundary here.

Last remark, please note that fuser don't use ++ but +: which is completely different. So althought I understand what you wrote, I think this is not really the problem hence +: shall be same from inside and outside, or at least we must mange it to be :-)

+: isn't completely different. It's actually almost identical to ++ in terms of its affect on the algebra: namely, the generation of an Append node. The difference is that +: uses the continuation stack from Cont, rather than synthesizing a new singleton stack deriving from End. In this context, that distinction is irrelevant.

Last idea (if we cannot do any better) would be instead using recursion, to introduce internal var to hold state for pipe, and tee, and introduce go accepting this var, instead of calling pipe and tee directly in recursion .That way we could possibly pass state between rcv and cont safely.

This is precisely my hold-out idea for fixing the fusers. It will work, but it's ugly as sin and I don't want to do it. It essentially means that, in order to implement something like the fusers, we need to go "outside" the algebra and thread state in a non-local fashion, and I'm not very comfortable with that situation. It implies that the algebra itself has a problem.

pchlupacek commented 9 years ago

@djspiewak I think I wasn't clear with my suggestion. I think fusers are a still bit differently implemented from the rest of fuser code, and we can assure than cleanup get called always before outside cont.


//in pipe 
  case Step(awt@Await(_, _, _), cont) => awt.extend(p => (p +: cont) pipe nextP1)
//can be implemented like
 case Step(awt@Await(req, rcv, cln), cont) => await(req,cln)(r => rcv(r).map(p => (r.fold(_=>p,a=>p onComplete cln(a)) +: cont) pipe nextP1)
// that way you assure cleanup will be invoked before cont get attached

Could you perhaps make test case that will broke this?

pchlupacek commented 9 years ago

Can you also please share your suggested Eval signature, specifically I am interested how you would suggest Eval to replace await1 et all.

djspiewak commented 9 years ago

Could you perhaps make test case that will broke this?

var count = 0
val cln = eval_(Task delay { count += 1 })
val p = await(req, cln) { r => rcv(r) } pipe await1

p.run.run
count == 1       // => false

The cln finalizer will be invoked twice. Alternatively, you might say that in the above the cln finalizer is only invoked once, in which case I will turn around with the following test case:

var count = 0
val cln = eval_(Task delay { count += 1 })
val p = await(req, cln) { r => rcv(r) }

p.run.run
count == 1       // => false

If the first test passes, then this second test will fail because count will equal 0!

The point is that either the evaluator for Await needs to run cln after it has finished evaluating the body (possible), or the fusers get the ability to move the cln around (as in your proposal), but we can't have both. In either case, we get a finalizer semantic that is undesirable.

Reiterating what I said earlier: the problem is the assumption made by the fusers regarding the nature of the body of Await. The property await(req) { r => p(r) } ++ cont === await(req) { r => p(r) ++ cont } does not hold, and yet this is precisely the property that the pipe and tee implementations assume. We cannot make this property hold in the presence of sound preemption, and so we need to remove it altogether. That is what my Eval proposal does.

Can you also please share your suggested Eval signature, specifically I am interested how you would suggest Eval to replace await1 et all.

case class Eval[F[_], A](fa: F[A]) extends Process[F, A]

def eval[F[_], A](fa): Process[F, A] = Eval(fa)

def await[F[_], A, B](fa: F[A])(f: A => Process[F, B]): Process[F, B] = eval(fa) flatMap f

await1 (well, really receive1) and friends are implemented on top of Process.await. As you can see in the above, I am able to implement Process.await without any trouble whatsoever. The only hangup here is that I cannot implement awaitOr as the signature stands. This isn't a problem directly, since awaitOr isn't a very useful primitive, but it does become a problem for receive1Or.

This problem is fixed by slightly extending the Env algebra, allowing the primitives to await a signal which can either be a value or an EarlyCause. Higher level combinators (basically anything anyone would write) would be unchanged. We need to alter Env anyway to support available, so this isn't really a problem.

pchlupacek commented 9 years ago

@djspiewak your proposal is essentially move body of await to flatMap right? I need to think a bit about this.

on the test cases: AHA! .... as you say this wouldn't work with test cases I have presented, but I think both will work, if we attach cleanup immediately when await is created. And hence fusers do not create awaits, but rather use await.extend, which construct await different way then await, we can in that case leave fusers as they are.

//cleanup is attached at construction immediately once `rcv` is evaluated. 
//that guarantees to run cleanup as soon as body over which await is closed is evaluated. 
def await[F[_], A, O](req: F[A]
    , cln: A => Process[F,Nothing] = (a:A) => halt
    )(rcv: A => Process[F, O]): Process[F, O] = {
    Await(req
 , (r: EarlyCause \/ A) => Trampoline.delay(Try(r.fold(ec => Halt(ec), a => rcv(a) onComplete cln(a) )))
, (a:A) => Trampoline.delay(cln(a)))
  }
// cleanup is not attached to rcv, is just passed along for `interrupt` case 
// that assures cleanup won't hold any resources over pipe/tee combinators.
def extend[F2[x] >: F[x], O2](f: Process[F, O] => Process[F2, O2]): Await[F2, A, O2] =
      Await[F2, A, O2](req, r => Trampoline.suspend(rcv(r)).map(f), cleanup)
djspiewak commented 9 years ago

on the test cases: AHA! .... as you say this wouldn't work with test cases I have presented, but I think both will work, if we attach cleanup immediately when await is created. And hence fusers do not create awaits, but rather use await.extend, which construct await different way then await, we can in that case leave fusers as they are.

Ok, you're envisioning cln as a preemption handler, not as a finalizer. First off, this doesn't at all address the associativity problem. Second, it forces us to define finalizers in two places (cln and onComplete). This is essentially the main disadvantage with the Task.async option, but without any of the atomicity advantages.

In any case, this isn't really what the original proposal for Await was. It's a possibility, but I think it's not as good as the Task.async option even in the current algebra, and it doesn't address the associativity issue in any case.

pchlupacek commented 9 years ago

@djspiewak Sure, the associativity is not addressed (I didn't meant it, sorry to be not clear here). This only address issue where the cleanups are not invoked under current algebra in certain cases as you perhaps saw in #353. On the associativity, we perhaps need to do more, or as you propose change algebra a bit. I can't still the full picture of your proposal, do you have in plan to try to implement//draft it?

If you carefully go through the proposed signature, you actually do not need to define the finalisers in two places. Once you define cln, it will be attached as onCleanup. The only thing you need to be aware is that if resources have to be cleaned up, you must to attach it to await. I don't think you have any other option under current algebra, hence we erase A immediately after Await is evaluated.

If we don't have any better option, I think we have to get at least what we can implement now, with minimal changes, hence current implementation imho is broken, and we need to fix it.

I actually thought about Task.async a bit more, and it may require different signature, actually. The reason is that you actually will need to factor in strategy, so the signature and implementation may be quite a different and perhaps more complicated. You will have to pass Strategy to Task.async so perhaps the signature will have to be:

object  Task {
  def async(f: ((Throwable \/ A) => Boolean) => Unit  )(implicit S:Strategy) = ???
// or 
  def async(f: ((Throwable \/ A) => Boolean) => Strategy  ) = ???
}
djspiewak commented 9 years ago

Sure, the associativity is not addressed (I didn't meant it, sorry to be not clear here). This only address issue where the cleanups are not invoked under current algebra in certain cases as you perhaps saw in #353. On the associativity, we perhaps need to do more, or as you propose change algebra a bit. I can't still the full picture of your proposal, do you have in plan to try to implement//draft it?

The implementation (for the Eval proposal) is actually very straightforward. It's a pretty dramatic simplification of the algebra relative to Await, which is one thing that appeals to me a lot. The cln proposal complicates the algebra. It does have the advantage of not requiring a change to Task, but it misses the opportunity to simplify which gives me pause.

I don't have a draft implementation because I didn't want to do that until we had progressed further in knowing whether or not it would be useful, but I have no doubt that the implementation will be very straightforward. The only real complexity, honestly, is the requisite modifications to Task.

If you carefully go through the proposed signature, you actually do not need to define the finalisers in two places. Once you define cln, it will be attached as onCleanup. The only thing you need to be aware is that if resources have to be cleaned up, you must to attach it to await. I don't think you have any other option under current algebra, hence we erase A immediately after Await is evaluated.

Use case: I have an Await which acquires a resource, a process in the body of the await which uses that resource, and I have a Task which can free that resource. I must guarantee that said Task is invoked exactly once, regardless of whether the body completes or if the Await is preempted. If the body does run, the freeing Task needs to be run as soon as the body completes.

I do not at present see how your proposal addresses this use case without requiring the user to compose in the Task in two places: once as an onComplete and once as a cln (in the Await).

If we don't have any better option, I think we have to get at least what we can implement now, with minimal changes, hence current implementation imho is broken, and we need to fix it.

We have three options on the table right now, actually:

  1. Add cln to Await and require users to define finalizers separate from preemption cleanup
  2. Remove Await, add Eval, modify Task and require users to define finalizers separate from preemption cleanup, which will be atomic
  3. Reimplement tee and pipe using mutable state to manage the Tee and Process1 state

1 complicates the algebra, 2 simplifies it but modifies Task, and 3 jumps the shark.

I actually thought about Task.async a bit more, and it may require different signature, actually. The reason is that you actually will need to factor in strategy, so the signature and implementation may be quite a different and perhaps more complicated. You will have to pass Strategy to Task.async so perhaps the signature will have to be:

In practice, basically every use of Task.async has a Strategy in scope already, since if you don't have one in scope, you cannot safely invoke the callbacks. Thus, I don't really see the need to pass the strategy a second time.

djspiewak commented 9 years ago

It just occurred to me that we can implement Eval even without modifying Task since we can play the same trick as with Await: putting the preemption logic on the node as cln. No modifications required to the concurrency primitives and a dramatic simplification of the algebra and fuser implementations. This would be my preferred approach.

djspiewak commented 9 years ago

I did some work on Eval, prototyping on a branch, and I'm actually no longer in favor of this approach. await and awaitOr are pretty trivial, though awaitOr does pick up a Catchable constraint on F (which I always thought it should have in any case). The problem is really in the other combinators. flatMap requires that we introduce a Bind case, similar to Free, since we cannot get "inside" the Eval in the same way that we can get inside of Await (you can think of Await a bit like Eval + Bind). Things really go downhill from there.

The fusers are pretty much as simple as I thought they would be. However, they highlight a strength of Await that I hadn't fully realized: it preserves internal chunking due to the existentiality of the A type. Eval cannot do that, since it is constrained to return something of type O. Thus, while Await has the opportunity to work with the new available fusers, Eval cannot. This is a huge, fundamental limitation in my proposal, and one that I feel is a bridge too far.

@pchlupacek I still want to see how your cln proposal (option 1 in my previous list) manages the use-case I described in my prior comment. However, even if it does require defining the finalizer logic twice, I think it's probably still worth implementing.

pchlupacek commented 9 years ago

@djspiewak I'll try to do some prototyping on this over this weekend. I am not sure if I am still understanding your worries about finalizer logic, hence if you look on code I have suggested before, the cln is added automatically to correct places in await when using await constructor. User will have to specify it only once, doing the await creation, and I think that's the reasonable.

But as said, I'll try to implement it in branch and then we'll se from where we will go. I'll also will add the tests you have suggested so we see if this works for these cases or not.

I may have a bit trouble to dig into runAsync code of your. If that will make me a hell, I just do something there, and perhaps ask you with help on that laters.

djspiewak commented 9 years ago

@pchlupacek Oh, ok I understand what's going on now. That's clever! I agree that it is a reasonable approach. It isn't backwards compatible, but I don't think that anything is going to be backwards compatible.

The only concern I have is that it makes infinite await bodies extremely problematic. A good example of this is repeatEval. I think infinite await bodies were pretty crazy to begin with though, and I don't mind making it clear that they aren't supported/idiomatic.

I'll snag your branch and make the stepAsync changes. It's actually going to be fairly trivial, due to the way that the cases are separated. Should just be a one-liner, in fact.