Closed LukaJCB closed 6 years ago
I was thinking something along the lines of:
@typeclass trait Finalizable[F[_]] extends Async[F] {
def bracket[A, B](acquire: F[A])(use: A => F[B])(release: Either[Throwable, B] => F[Unit]): F[B]
}
@LukaJCB s/IO/F/ right?
Ah yes, of course, sorry! Edited.
It doesn't need to be Async
, unless I'm missing something plain Sync
is fine.
Yeah Sync
is probably fine, I'd just like it work with async exceptions as well 😄
Sure, but there's no difference for our purposes, Async
means the async
builder and I don't see a use for it here.
If such a Finalizable
type were to exist, then Monix's Coeval
should be able to implement it, as it implements Sync
. Or you can think of the current Scalaz 7 IO.
Yeah, I really can't tell on where it best fits within the hierarchy, I was just trying to find a good type signature to exemplify the use case :)
Minimally, you could introduce a feature specifying the ability to add a finalizer:
trait Finalizable[F[_]] extends Monad[F] {
def ensuring[A](fa: F[A])(finalizer: F[Unit]): F[A]
}
which has a trivial implementation for cats-effect IO
:
class IO[A] {
...
def ensuring(finalizer: IO[Unit]): IO[A] =
for {
e <- attempt
_ <- finalizer
a <- e.fold(IO.raiseError, IO.apply)
} yield a
...
}
The safety guarantees of this implementation break down in the presence of async
actions that never terminate, but that's not something that can be solved with this implementation of IO
(nor can you know the difference between an action that never terminates, and one which is just taking a really long time to return, for some definition of really long).
While ensuring
may seem too low-level, it's actually all that you need for IO
. When combined with an MVar
or equivalent, you can implement more elaborate control structures on top.
More comprehensively, you can introduce a MonadBracket
that extends and whose laws relate to MonadError
. This abstraction would have a bracket
primitive along these lines:
def bracket[A, B](acquire: F[A])(release: (A, Either[E, B]) => F[Unit])(use: A => F[B]): F[B]
This too has a trivial implementation for IO
:
class IO[A] {
...
def bracket[B](release: (A, Either[Throwable, B]) => IO[Unit])(use: A => IO[B]): IO[B] =
for {
e1 <- attempt
e2 <- e1.fold(IO.raiseError, use).attempt
_ <- e1.fold(_ => IO.unit, a => release(a, e2))
b <- e2.fold(IO.raiseError, IO.apply)
} yield b
...
}
subject to the preceding safety caveat on async
actions, plus a new one, which arises from the possibility (if you allow for it), of use
raising an exception before it returns an IO[B]
(it would be consistent with Cats IO philosophy to catch this exception and pass it along to the finalizer); and possibly further, from release
potentially raising an exception before returning IO[Unit]
.
Personally, I'd recommend MonadBracket
, because while it may have fewer instances, it's trivial to provide ensuring
atop MonadBracket
, while going the other way requires much more machinery.
The safety guarantees of this implementation break down in the presence of async actions that never terminate, but that's not something that can be solved with this implementation of IO (nor can you know the difference between an action that never terminates, and one which is just taking a really long time to return, for some definition of really long).
This is a valid point, however given that cancellation is natural to pop in such a topic, I have to point out that in the presence of async actions you have no definitive guarantees, regardless of IO
implementation.
There are two problems with IO
actions that can be cancelled:
use: A => IO[B]
action, a use
action that needs to cooperate in order to allow cancellation, but it may not cooperateregister: (A => Unit) => Unit
" and anything that you add on top of this for safety is not enforceable in all cases, or in other words some IO
actions are naturally not cancellable — because this depends on having control of some cancellation process and "async" means giving up control by definition — there have been other abstractions trying to improve asynchrony, to make it deterministic, but those that ignore this fundamental nature of asynchrony are fundamentally flawedBut to my first point, which is really important, suppose that we have an:
val acquire = IO {
new BufferedReader(...)
}
And then lets do some classic blocking I/O for the JVM:
def use(in: BufferedReader) = IO {
val buffer = new StringBuilder()
var line: String = null
do {
line = in.readLine()
if (line != null) buffer.append(line)
} while (line != null)
buffer.toString()
}
Now how would cancellation work in this case?
You could trigger an in.close()
, forcefully closing the file/socket handle, but that would be concurrent with the while
loop and you can't control what happens next — maybe the InputStream
implementation is not thread safe, maybe closing that connection forcefully like that will make the stream emit junk on the consumer's side. For instance, maybe instead of throwing an exception, readLine
will emit null, making that consumer think it's a valid EOF and emit incomplete results as the next step. There are lots of maybes here. You can also try a Thread.interrupt
of course, but that's extremely unreliable and also happens to tie your implementation to JVM threads, which isn't a good idea.
There are valid strategies of course to making this cancellable, such as ...
synchronize
blocks, along with a boolean isCancelled
value checked on each cycle of that loopflatMap
chain, again, with an isCancelled
boolean being checked on each cycleHowever as can be seen, any valid strategy for cancellation requires cooperation either from the A
resource being used, or from the use
phase, both of which being the responsibility of the user (with help being possible from the underlying IO
/ Task
implementation, as is the case of Monix's Task and I think Scalaz 8's IO as well).
And I missed the most important detail of all — this implementation happens to run in the same JVM process and we have full control of the asynchronous task being executed, but you can't have such control if the task is executed on some other node in the network, by code written by other people. Again, I'm going back to (A => Unit) => Unit
which is the only signature you can actually count on in all cases.
Therefore the possibility of non-termination in case of asynchronous actions is a fact arising from what asynchrony is: tasks being executed or events arising independently of the main program flow, concurrently with the program's execution, with no ordering guarantees and hence cancellation and safe handling of resources can be out of your control.
Of course, it's awesome when you control the code, the connection, the network machine executing the code, the communication protocol and can describe tasks that can be cancelled. That's awesome, but it doesn't always work, in which case back-pressuring on a really long and possibly non-terminating task is actually acceptable.
This is a valid point, however given that cancellation is natural to pop in such a topic, I have to point out that in the presence of async actions you have no definitive guarantees, regardless of IO implementation.
That's not true. In Scalaz 8, you can interrupt a pending async action, even if it's infinite. This results in (a) disconnection of the handler, such that if the async action resumes at some point in the future, its value will be ignored; (b) termination of the fiber, with attendant guarantees around finalization; (c) optional execution of an interruptor to actually cancel the pending async action.
In many cases, you can trivially cancel pending async effects. For this reason, I'd argue (separately) the signature of async
accepting (A => Unit) => Unit
is fundamentally broken, because it provides no possibility to interrupt the pending action (e.g. shut down a network connection, unschedule a sleep, or cancel a computation running on a distributed node).
Instead, async
should copy Scalaz 8 IO, which provides something analogous to (A => Unit) => (Throwable => Unit)
(i.e. registering a callback provides an interrupter, which can be used by some async actions to safely interrupt or cancel the pending action).
Interruption points in Scalaz 8 IO are (a) between operations processed by the interpreter (i.e. very fine-grained interruption, but no attempt is made to interrupt synchronous effects); (b) whenever an async action is currently in-flight and not yet completed.
What this design allows you to do is ensure that finalizers are eventually run by timing an action out (which is implemented using racing, which in turn is implemented using interruption of the loser). So for actions that have an upper bound on time-to-execute, you can provide stronger resource guarantees than a design without interruption.
But all of this is somewhat orthogonal to the topic of this issue, which is whether or not it makes sense to abstract around finalization. My recommendation would be to:
MonadBracket
, as defined above, or similarly.bracket
as sketched above, because it's a legitimate implementation that can be made safe using only existing machinery in cats-effect (whose only "loophole" is a long-running / infinite async action, but I'd argue users already have that problem even with try
/finally
, so the potential existence of a long-running / infinite async action is not sufficient cause to deny them bracket
).Then Monix Task
, Scalaz 8 IO, and other effect systems are free to implement it as they choose.
Separately, I'd also recommend:
async
as suggested above, because the current definition is broken and will always lead to leaky, non-composable applications. Scalaz 8 IO and Aff (in PureScript) have the same design for async
and it's proven itself in many real world applications.MonadFork
with weak guarantees and a trivial implementation for cats-effect (e.g. interruption does not actually do anything, it just resumes on completion of the forked action), allowing Monix, Scalaz 8 IO, and others to leverage their additional machinery to provide stronger guarantees.The whole idea of encapsulating over effects requires encapsulating over the strongest guarantees of any effect system, because failure to do so ensures libraries will be tied to the lowest common denominator. It's better IMO to provide the full feature set in the encapsulation, and provide weaker guarantees for lightweight implementations. This way everyone can leverage the richest features of the most powerful libraries, yet they will degrade gracefully for lightweight implementations.
Instead, async should copy Scalaz 8 IO, which provides something analogous to (A => Unit) => (Throwable => Unit) (i.e. registering a callback provides an interrupter, which can be used by some async actions to safely interrupt or cancel the pending action).
That is also how Monix works, although you've got some interesting design choices around flatMap
chains being cancelable by default (opt-out instead of opt-in) with which I disagree, but that's unrelated to this issue.
Related to this issue I would like a MonadBracket
and a Finalizable
— I actually felt the need for it recently. Not sure about the MonadFork
, would like to see a proposal first.
@alexandru What would be the difference between MonadBracket
and Finalizable
?
It's a good time to restart the discussion on this type-class.
@LukaJCB your initial proposal was this ...
@typeclass trait Finalizable[F[_]] extends Sync[F] {
def bracket[A, B](acquire: F[A])(use: A => F[B])(release: Either[Throwable, B] => F[Unit]): F[B]
}
release
needs A
, since A
is probably some file handler that needs to be closed and you need itB
value?I need this going forward, however I was thinking something like this:
@typeclass trait Finalizable[F[_]] extends Sync[F] {
def bracket[A, B](acquire: F[A])(use: A => F[B])(release: A => F[Unit]): F[B]
}
Or lets say that sometimes we might need an error condition if it happened in F[B]
, because it might change the release logic, so we could do this:
@typeclass trait Finalizable[F[_]] extends Sync[F] {
def bracket[A, B](acquire: F[A])(use: A => F[B])(release: (A, Option[Throwable]) => F[Unit]): F[B]
}
Anything I'm missing?
Maybe we should go with the full version, but the signature seems overkill:
@typeclass trait Finalizable[F[_]] extends Sync[F] {
def bracket[A, B](acquire: F[A])(use: A => F[B])
(release: (A, Either[Throwable, B]) => F[Unit]): F[B]
}
I think having the full signature seem overkill can be a a good sign, as it can potentially mean that we've covered all the use cases or have something akin to a primitive. From there it's super trivial to define some more specific and easy to use derived functions, no? :)
Sure, I was also thinking that requiring Either[Throwable, B]
to show up in there is a not very obvious restriction that Sync
does not have — the restriction being that F[_]
generates exactly one value as the result of its evaluation. Such a restriction wouldn't happen with an Option[Throwable]
.
On the other hand we have the same restriction by the combination of acquire
and use
; if F[_]
would be a datatype that streams multiple events, then due to the signature use
would end up being called multiple times and that doesn't make sense.
I would name this MonadBracket
or just Bracket
, feels a little overreaching because we have bracket
with a different signature for streaming types. But then cats.effect
is about IO
-like data types, so that might not be a problem and it's less general as a name compared with Finalizable
.
I see in that fpcomplete.com article that Haskell has MonadBracket
and that it's similar to what's described above, correct? I think we should keep the name to make it easier for people migrating over to recognize it.
Another question — should MonadBracket
depend on Sync
or should Sync
depend on MonadBracket
? Or maybe neither.
Some notes on that:
Sync
is about tail-safe flatMap
, whereas imo MonadBracket
can just depend on MonadError[Throwable, ?]
. Sync
require MonadBracket
would introduce at least the restriction I mentioned above (i.e. F[_]
being required to generate exactly one result), not necessarily a bad thing, but it changes Sync
Effect
can require MonadBracket
if we don't want to change what F[_]
data types can support these typeclassesSo thinking about it, I think we can have MonadBracket[F] <: MonadError[Throwable, F]
and Effect[F] <: Async[F] with MonadBracket[F]
.
But then if MonadBracket
only depends on MonadError
, maybe we should parametrize the error type, so maybe it should be MonadBracket[E, F]
?
OMG, I'm getting lost in details 😀
I like the last solution, i.e. MonadBracket[E, F] <: MonadError[E, F]
and Effect[F] <: Async[F] with MonadBracket[Throwable, F]
👍
We should also keep in mind where we'd want to place concurrency in such a hierarchy imo.
@SystemFw what concurrency, any operations you have in mind?
Well, I don't want to over constrain concurrency schemes for cats-effect implementations, so they would need to be very primitive (and expressive) ones.
Perhaps fork: F[A] => F[Unit]
and raceWith
?
We can currently express fork
for any Effect
, but not in a resource safe way.
If we bring in concurrency, we would probably need to bring in cancellation as well, no? I'm not at all opposed to having cancellation as a primitive, but it also probably means a lot more drastic changes to IO
as far as I can tell.
I see interruption, concurrency and resource safety as inextricably linked. I don't have a full design in mind, just pointing out that we should keep it in mind. What I want to avoid is having a hierarchy that's capable of delivering operations, but not in a safe/composable fashion.
Yeah, maybe we should have Bracket <: Cancellable <: Forkable
We can currently express fork for any Effect, but not in a resource safe way.
This is an interesting discussion to be had. I'm not very fond of fork
or start
— they seem to be useful primitives, but if you really think about it, it's very hard to express anything in a resource safe way with it. E.g.
for {
fa <- ta.start
fb <- tb.start
a <- fa
b <- fb
} yield (a, b)
If you squint, this looks like parMap2
at least to beginners. The problem is that it isn't at all equivalent in regards to error handling.
If fa
fails, the process is short-circuited and it won't back-pressure on fb
due to flatMap
's contract. You can fix it, but it requires extra logic. If the underlying datatype supports cancellation (e.g. Monix Task) you can specify that in case fa
fails then it should cancel fb
, but it's not obvious to beginners that they need to do this.
Which means that people can easily end up with a connection leak and consider that a safe parMap2
can be specified for IO
as well, but instead of cancellation you just back-pressure until both are finished.
If fb
fails, this piece of logic won't see it until fa
is finished. Which really means that you can't cancel fa
if fb
fails because you can't specify this cancellation logic until after successful a
got generated, again due to flatMap
's contract.
So basically the problem is that we have a concurrency issue, fa
being concurrent with fb
, but fa
is not linked to fb
in any way and we can't link them easily because flatMap
by its nature is sequential.
Also in the context of Monix's new Task.start
, you get a Task[Task[A]]
back, but the cancellation of the inner Task
generated is divorced from the cancellation of the outer task. Basically whenever the inner Task[A]
actually gets generated, the outer task is done with processing, being out of the loop, which means that you can't cancel it anymore and the user becomes responsible for back-pressuring or cancelling the inner Task
, or otherwise risk ending up with a resource leak.
Now I'm sure there are possible solutions to this. If you have some underlying execution context that handles execution (fiber?), maybe you could have a global registry of cancellation callbacks that need to get called and whose lifetime exceeds that of the current Task
value being evaluated (Monix's Task
currently assumes that the lifetime of cancellation callbacks does not exceed the lifetime of the Task
being currently evaluated, but this can be changed if compelling enough). In such a case however the design gets very complicated and it's not at all intuitive that this happens, again, because of flatMap
and its contract.
I like start
, but it's an operation meant for big boys 😉 So the question is — should we provide fork
and/or start
in our type class hierarchy?
I don't know, maybe, they seem useful, but adding MonadCancelable
won't fix their inherent unsafety.
On having a MonadCancelable
/ MonadKill
, the problem is that multiple designs are possible and introducing it in cats-effect
basically forces a design above others.
For example Monix's cancellation does not request an error to be provided as the reason for why cancellation happens. Cancellation is meant as an "unsubscribe" (using Rx's terminology), or in other words it's a way of saying to the producer that the consumer is no longer interested in any further data and that the connection can be closed, forcefully. Cancellation isn't meant to kill threads or fibers, or to do any consumer driven reporting, or whatever.
Another design choice is that in Monix cancellation does not back-pressure on any specified cancellation actions to be finished. The new pure Task.cancel
back-pressures only until the cancellation signal is sent, but the actual closing of the file handles (or whatever) can come a little later.
This is a conscious design choice, because in terms of asynchronous producer/consumer protocols having to back-pressure on acknowledgement from the producer that cancellation is finished is a really bad idea.
I'm 👍 for a MonadCancelable
, but not if this limits the design choices of competing implementations, since that defeats the purpose of having type classes in the first place. It's pretty bad to have some type classes that work only for data types that are exactly as powerful as the reference IO
.
On cats.effect.IO
becoming cancellable, I personally disagree, for the same considerations mentioned for MonadCancelable
.
This IO
represents the baseline, a simple implementation that does its job and that doesn't handle concurrency.
If you increase its capabilities to include cancellation, this will make it more unstable, since we can be sure that we won't pick the best design and at the same time discourage alternative implementations. Stability and simplicity were original design goals when the project was started and we'd need pretty good arguments to change them.
Well, I have only mentioned fork
, not start
(which is a higher level operation if you have Promise
), but maybe you don't even need fork
as a primitive, since you can implement it already for any effect (even though that requires a runAsync
, so it's likely going to be less efficient).
I definitely agree that's an operation for expert users, but I see it as a primitive that you build stuff on top of: I don't mind if an operation can be unsafe (if it's low level/primitive), but I definitely mind that it can't be made safe (as it's currently the case).
I guess my priorities for the cats effect typeclasses include:
This is from the viewpoint of both a user and an implementor of fs2. The two main solutions seem to be:
bracket
and/or interruptionSo I'll try to rebuke both.
Using a more powerful effect type is a no-go for me. The minor argument against it is that I want cats IO
to be a fully fledged implementation, but that's mostly preference. The real reason is that having wildly different capabilities in different implementations kinda makes having typeclasses for effects moot in the first place. This becomes particularly evident in fs2, where we abstract over the effect type. Since we are abstract, we have to program to the lowest common denominator, i.e. the non resource safe IO
, so even if you want to use the resource safe Task
, we as implementors can't exploit that knowledge.
You can have a taste of that in the implementation of fs2.async.Promise.cancellableGet
. First, the fact that this operation exists in the first place is a red flag: ideally you want interruption on top of any primitive, or you'd have a combinatorial explosion in the api, as you need cancellable and non cancellable version of everything (e.g. look at fs2.async.mutable.queue
).
Second, the type of cancellableGet
is F[(F[A], F[Unit])]
, where the F[Unit]
is used to interrupt the semantic blocking on the Promise
. This type seems reasonable at first, but note that now you have to worry about the inner F[A]
being called multiple types, and about the cancel
action being called before the action it's meant to cancel: matter of fact, we had to deal with this and make the implementation way more complex. Compare with the simple get: F[A]
we could provide if we had bracket
on any effect, which could then be cancelled compositionally if we had interruption as well (with the guarantee that bracket
respects that, ofc).
Use Stream
. Obviously I love Stream
, and I think is the best way to structure the flow of programs built on top of composable semantic actions (built in IO
/Effect
), but Streams
is an abstraction that operates on top of an Effect, not a replacement for it.
On one hand, you still need need resource safety and interruption at the stream level (which are a more complex problem to solve that just doing it at the effect level). On the other hand, a Stream represents a coinductive process, you shouldn't be forced to use it just because you need bracket
, as that forces onto your users the concept of "singleton streams", which is really not what a stream is good for, since it forces you to keep track of the cardinality of the stream.
It's fine to produce a singleton stream at call site, (by eval
ling a bracketed IO, if that existed), since locally you can clearly see that the Stream has one element, but exposing a resource as Stream[F, A]
with a comment that says "this streams emits only one element" is definitely a pain point.
@SystemFw these are some good arguments. Will sleep over it 😀
Compare with the simple get: F[A] we could provide if we had bracket on any effect, which could then be cancelled compositionally if we had interruption as well (with the guarantee that bracket respects that, ofc).
Btw, I was under the impression that even with an F[A]
that is cancellable, a bracket
needs to execute as an atomic unit, either all or nothing and shouldn't be cancellable. Is this not the case? Am I misunderstanding bracket?
I really strongly believe that any interruption/resource safety in lightweight effect type will be hard to implement in any reasonable lawful way. So far, the experience is that when you want to implement both of these, you end up with very complex logic, that in certain cases wildly blows complexity of effect full type implementation.
As @SystemFw pointed out concurrency and asynchronous execution is one problem, whereas other is certainly nested resource allocation and nested interruption (both implemented stack-free) . If you think how complex is to express these in code and then in implementation, I can hardly imagine to be able implement that only with F
similar to IO
.
Btw, in absence of a general consensus on a grand design I'm a believer in making small steps in order to move things forward. For now I'm interested in MonadBracket
since I believe this is badly needed.
So can we agree on these two points?
MonadBracket[E, F] <: MonadError[E, F]
and Effect[F] <: Async[F] with MonadBracket[Throwable, F]
bracket
, since termination with an error is not cancellation, so we need consensusFor me on this Monday of January it makes sense for Effect <: Async with MonadBracket
and for bracket
to behave like an atomic, but please raise concerns.
My thoughts:
1) Sync
should extend MonadBracket
2) We should support cancellation in the type class hierarchy
@pchlupacek
I really strongly believe that any interruption/resource safety in lightweight effect type will be hard to implement in any reasonable lawful way.
The thing is that we now have a counter example (apart from haskell ofc), in scalaz IO.
So far, the experience is that when you want to implement both of these, you end up with very complex logic, that in certain cases wildly blows complexity of effect full type implementation.
Well, the experience we have is with doing that for Stream, which is (significantly, imo) harder.
@alexandru
Btw, I was under the impression that even with an F[A] that is cancellable, a bracket needs to execute as an atomic unit, either all or nothing and shouldn't be cancellable. Is this not the case? Am I misunderstanding bracket?
That's not a prerequisite. In haskell IO everything can be interrupted. In scalaz IO the acquire and release are non-interruptible (I like that as a default), but you can definitely interrupt the use
action. In fact, that's a great use for bracket
(or its close cousin finally
/ensuring
/pickAName
) : you define how to dispose of any resources in case of interruption, and this is tracked for you across all operations (similarly to how Stream.bracket
does it, but for IO
).
In the example with Promise
, interrupting the async action of waiting on a Promise would have an ensuring
that deletes the stored callback from the internal Promise state (I'm ofc talking implementation details here, it's just an example)
@SystemFw I am eager to see first implementation shot and laws. I cannot speak for scalaz 8 IO, due to lack of understanding it, but what I was following and able to understand, there are tradeoffs as well.
I am not saying this is not possible, I am just saying that it may bring into scope instrumentation that may not be for free.
Of course if the cost will be negligible would be great to have it. :-)
I really strongly believe that any interruption/resource safety in lightweight effect type will be hard to implement in any reasonable lawful way.
The thing is that we now have a counter example (apart from haskell ofc), in scalaz IO.
Out of curiosity, do you have any Scalaz 8 examples that for Monix's Task do not work OK or aren't lawful?
That's not a prerequisite. In haskell IO everything can be interrupted. In scalaz IO the acquire and release are non-interruptible (I like that as a default), but you can definitely interrupt the use action. In fact, that's a great use for bracket (or its close cousin finally/ensuring/pickAName ) : you define how to dispose of any resources in case of interruption, and this is tracked for you across all operations (similarly to how Stream.bracket does it, but for IO).
I have yet to play with Scalaz's new IO to my shame, but I'm pretty sure this makes it a leaky abstraction. Consider your own example:
In the example with Promise, interrupting the async action of waiting on a Promise would have an ensuring that deletes the stored callback from the internal Promise state (I'm ofc talking implementation details here, it's just an example)
When you're operating on an AtomicReference
, it's all good and fine because you get synchronization for free. The primary use-case for bracket
however won't be thread-safe primitives like AtomicReference
, but rather very thread unsafe file handles or network sockets, whose access will have to be guarded by synchronize
blocks (intrinsic locks) otherwise that bracket
code can be inherently unsafe.
We talked about it on the Gitter channel before. I'm very negative on having an ensuring
that conflates termination with cancellation, as the two events are distinctly different. This is why in Monix we have a doOnFinish
, along with a separate distinct doOnCancel
. Because cancellation is always concurrent with whatever happens in use
, whereas normal termination is sequential.
We often like to think of this:
try { use stuff }
finally { release }
That finally clause in Java will execute for a use
that terminates normally or for an InterruptedException
, doesn't matter. However there's actually a big difference between this and cancellation of asynchronous actions.
The problem with asynchronous actions is that when your finally
block executes, your use stuff
logic can still be running and there's no way to work around that, other than to do manual synchronization, possibly with plain locks / monitors / semaphores, which we all know are leaky and don't compose.
I also gave the following example above:
def use(in: BufferedReader) = IO {
val buffer = new StringBuilder()
var line: String = null
do {
line = in.readLine()
if (line != null) buffer.append(line)
} while (line != null)
buffer.toString()
}
There's no way to cancel this safely without extra thread synchronization or without cooperation, so there are two choices actually:
in
and check if some boolean was cancelledIn turn John avoids the problem and replies:
That's not true. In Scalaz 8, you can interrupt a pending async action, even if it's infinite. This results in (a) disconnection of the handler, such that if the async action resumes at some point in the future, its value will be ignored; (b) termination of the fiber, with attendant guarantees around finalization; (c) optional execution of an interruptor to actually cancel the pending async action.
This is actually how cancellation in Monix works, well mostly, because in Monix cancelable flatMap
loops are opt-in, rather than opt-out, plus there are multiple ways to do "disconnection of the handler", some more problematic than others.
But here's the big problem — if you don't actually stop that loop, even if you block the handler from being triggered (like John is saying), with timeouts or whatever, it will mean that the loop ends up reading from the same file handle that you're going to attempt closing in release
.
And because access to our file handle is not thread safe, you can end up with corrupted data. Not in this sample, but you can easily imagine for example network sockets lingering on, or being closed with a broken TCP handshake that leaves them in a CLOSED_WAIT or TIMED_WAIT state, waiting to be timed-out by the kernel itself, because the bits that were sent were concurrent with whatever that use
task is doing. I actually had this problem in the past, with the result being a crashed cluster of nodes on AWS in a chain reaction.
Also a second problem is that on cancellation, as I hinted previously, you can't expect for the producer to acknowledge your cancellation. This has been a really bad design in TCP-related protocols, as seen over the years. You can't rely on it, so at least for over the network communications you just send the cancel signal and then close / ignore the connection.
What this means is that when you cancel, it's a bad idea to wait for the task specified by release
(or by ensuring
) to finish. So you either do that and ignore dozens of years of bad experiences when dealing with network communications, or you end up with concurrency issues. Pick your poison.
So if you want to release for example some network transaction which should COMMIT in that release
, there's actually a difference between normal termination and cancellation. Because on normal termination you want to COMMIT, on error you probably want to ROLLBACK and on cancellation you probably want to immediately close the connection without any kind acknowledgement.
In other words, expecting the user to synchronize access to that A
resource because cancellation is possible and thus the release
can happen concurrently — means that bracket
or the corresponding ensuring
are leaky abstractions. Or to put it differently — if there's any possibility that release
can happen concurrently with whatever goes on in use
, without distinguishing between cancellation and normal termination, then it's broken.
TL;DR — i'm not a fan of release
/ ensuring
happening as a result of cancelling a task 😎
IIUC, your argument is twofold (sorry if I'm misrepresenting!):
bracket
+ interruption
+ IO(...)
might require extra effort to synchronise access to the stuff being modified in the IO
bracket
handles both termination and cancellation, there needs to be a way of running actions conditional on which of the two has caused release
to be called.The first point is definitely true, but I don't think it's a blocker. First, we would be able to build our own things that won't have this problem (like fs2 already does). Second, once you have wrapped the mutable crap in IO.bracket
(doing the extra work for synchronisation if required), what you have is composable now. I agree is non-optimal (ideally bracket
would just magically work with any IO
you throw at it), but it's still a net win. But since this is not Haskell and stuff hasn't been built (yet) with IO
in mind, we'd have to do the dirty work in the "FFI".
Note that we do a similar thing already when reimplementing java blocking stuff in a non-blocking fashion.
I don't disagree with the second point either, but I don't see how it's a problem, can we not say that release
will take an argument that will signal the reason why release
has been called (interruption, orderly completion, completion with an exception)?
I don't disagree with the second point either, but I don't see how it's a problem, can we not say that release will take an argument that will signal the reason why release has been called (interruption, orderly completion, completion with an exception)?
Yes, that's what I was trying to say — if we make it to do release
by cancellation, then that release
needs to know that it was called as a result of a cancel action.
FWIW, here's the full Haskell version (which is the only one I could find): https://hackage.haskell.org/package/foundation-0.0.6/docs/src/Foundation-Monad-Exception.html#generalBracket
Unfortunately, I wasn't able to find any prior art with regards to laws for such a type class, so I thought about it a bit and I came up with this:
forAll { (fa: F[A], f: A => B, funit: F[Unit]) =>
bracket(fa)(f >>> _.pure[F])((_,_) => funit) <-> fa.map(f).flatTap(_ => funit)
}
forAll { (fa: F[A], e: E, funit: F[Unit], f: E => B) =>
bracket(fa)(_ => F.raiseError(e))((_,_) => funit).handleError(f) <-> fa *> funit *> f(e).pure[F]
}
Thoughts?
If we extend Bracket
from something like Cancellable
we could probably write some more laws with that.
On naming, I think we can't have a type class named Cancellable
, because that name is in use in Akka, Monix and RxJava / RxScala. MonadCancel
sounds better.
And personally I don't want Bracket
to extend a MonadCancel
because this will require implementing data types to also implement MonadCancel
.
Coeval
and the current IO
can have a bracket
, but Coeval
and the current IO
cannot be cancelled. If we would provide a MonadCancel
for Coeval
or the current IO
, it would be a dummy that would do ... source.flatMap(_ => Task.unit)
. I don't find that useful at all, because if we add a MonadCancel
, I see myself wanting to use it in a streaming type like Iterant[F, A]
in cases where F[_]
actually needs to be cancelable.
So if we end up preparing for MonadCancel
, my proposal is to find ways to not pollute the entire type class hierarchy with it — again, because a type class hierarchy that describes types that are exactly as powerful as Monix's Task or Scalaz's IO is not a very useful type class hierarchy.
And just to be clear, my preferences are:
MonadCancel
MonadBracket
IO
becoming cancelableYeah that makes perfect sense to me 👍
We could have a class that extends both Cancel
and Bracket
that could add these laws :)
if we have MonadCancel
but IO
is not cancellable, everything that's polymorphic on F
is not cancellable. That includes fs2
, Iterant
and http4s
(unless you say that IO
is not fit for them).
Also, runAsync
returns IO
as the concrete type (and we use runAsync
to implement concurrency primitives), that will also be non-cancellable.
Not sure why you're saying that, but you can build Iterant[Task, A]
instances that are cancelable, even if Iterant
does not have any knowledge of Task
's cancelation capabilities.
A node in Iterant
is defined as ...
case class Next[F[_], A](elem: A, rest: F[Iterant[F, A]], earlyStop: F[Unit])
extends Iterant[F, A]
When you're looking at this generic definition, it's hard to see, but on actual usage at the call-site that F
is going to be a Task
for things that should be cancellable. Consider this:
def readLine(in: BufferedReader): Task[Option[String]] =
Task.create { (ctx, cb) =>
val conn = ctx.connection
// Forking, to make it look interesting
ctx.scheduler.executeAsync { () =>
in.synchronize {
if (!conn.isCanceled)
try { cb.onSuccess(Option(in.readLine())) }
catch { case NonFatal(e) => cb.onError(e) }
}
}
// Cancellation logic; has to be synchronized!
Cancelable { () =>
in.synchronize { in.close() }
}
}
And then lets read a file, line by line, as a stream:
def readFile(file: File): Iterant[Task, String] =
Iterant.suspend {
val in = new BufferedReader(new InputStreamReader(new FileInputStream(file), "utf-8"))
// We don't actually need synchronization here, because ordering is guaranteed, but
// it's best practice for when you do synchronization to do it everywhere
val stop = Task.eval(in.synchronize(in.close()))
def loop(in: BufferedReader): Iterant[Task, String] =
readLine(in).attempt.flatMap {
case Right(Some(line)) =>
Next(line, Task.eval(loop(in)), stop)
case Right(None) =>
// Hitting EOF, normal termination
Suspend(stop.map(_ => Halt(None)))
case Left(error) =>
// BufferedReader triggered error
Suspend(stop.map(_ => Halt(error)))
}
// go, go, go
loop(in)
}
Is this cancelable?
Iterant
(aka operations that return Iterant
values, like take
), then it's going to go through Iterant
's very cooperative earlyStop
mechanismTask
out of it (e.g. with foldLeft
, foldRight
, etc), then yes, it yields a Task
that's very much cancellableDo we have redundancy in that logic for closing the file handle? Yes, but that's inevitable, as many times the logic for normal termination will be slightly different.
Can this function be described for any F[_]
? Not right now, because MonadCancel
is missing, but what this sample shows is that Task
's cancellation mechanism is an entirely different concern than what Iterant
does 😉
@SystemFw I wrote the above because I feel that you have unrealistic expectations out of a cancellable data type.
Maybe I'm wrong or maybe I misunderstood you, but in such a case crafting a sample to put us down to earth helps 😉
I should have picked something I know, not Iterant
:P
Can this function be described for any F[_]? Not right now, because MonadCancel is missing
but if MonadCancel
is there, and IO
is not cancellable, then I can't use it with IO
. And if I happen to need that function in the implementation of http4s
(or any other library that wants to expose a polymorphic api), I can't use IO
with it.
This is what would happen with fs2 Promise
and Queue
.
Maybe we should just have Task
as the new reference IO
with the new hierarchy 😜
What I'm advocating for is for us to not jump to conclusions — right now I have plenty of experience with Iterant
because I designed it such that its early termination is orthogonal to Task
's cancellation. Maybe I did that because I had Task
around (literally in the same repository :)).
But right now with my limited experience, I feel that Task
's cancellation is orthogonal to what polymorphic data types need, except for when you need concurrency — e.g. parMap2
(but you can describe a safe one for current IO
, it will just back-pressure) or race
. A race
operation for the current IO
is indeed unsafe. But do you really need it everywhere?
One reason for why I liked working with type classes is that you can take an incremental approach — by adding the restrictions only on the operations that need them.
This is why I'd like an examples driven approach.
I have a pile of Monix issues opened currently that I need to focus on for 3.0.0
. After that I promise to look at your Promise
implementation, its usage and purpose and draw some conclusions from that.
If indeed Http4s and FS2 need MonadCancel
to be the supertype of MonadBracket
we'll see what we'll do, maybe make IO
cancellable, but I'd like us to be sure that we can't separate these concerns first.
Does that sound good?
I don't have the bandwidth right now to engage fully in these design discussions so I like @alexandru's suggestion. My open source time is focused completely on getting FS2 0.10 released. After that's done, I'll have more time to participate. :)
I think what @SystemFw suggested was not that MonadBracket
and MonadCancel
shouldn't be separate concerns (I also agree here), but that the reference type should implement all the type classes in the hierarchy, so that libraries like Http4s and fs2 can fully make use of them.
I think there's room for a Coeval with a Sync
+ MonadBracket
instance, a less powerful IOlite
with MonadBracket
+ Async
but no MonadCancel
and finally a full blown IO
that has all the instances.
OK, we'll sleep a little on it, free ourselves of current commitments and regroup — I'm especially interested in @mpilquist's opinions here.
If there's consensus and I'd like Daniel to be on board, I can help out with the cancelable IO
, but I also liked the current IO
, so in that context IOlite
is OK, but that's going to be a naming problem, oh well...
Nobody said we have to deliver cats-effect
1.0 in January 😜
Agreed! I also think we can go at it incrementally as you mentioned earlier.
I think we have consensus that MonadBracket/Finalizable
is a good idea, so it probably makes sense to start with that one :)
Yeah, @LukaJCB summarised my position perfectly :) I also agree with @alexandru on the immediate way forward :)
So I made a first move and put something small together at #113. I think this should be a good idea regardless of how we move forward in terms of MonadCancel
or MonadFork
.
I raised this issue on the gitter channel and got a lot of positive feedback. We removed
IO#ensuring
some time ago and realizedMonadError
is not powerful enough to implement it (See here). I'd like to propose something in the vein ofMonadBracket
described in this article. It might also help with providing a resource safeParallel
experience.Relevant gitter discussion: