Closed github-actions[bot] closed 4 years ago
https://github.com/arrow-kt/arrow/commit/fec5c82e97f371fbc102977dba8ce9257d5df4e7/checks
diff --cc modules/core/arrow-core-data/src/test/kotlin/arrow/core/ListKTest.kt index 43c322958,5d334df81..000000000 --- a/modules/core/arrow-core-data/src/test/kotlin/arrow/core/ListKTest.kt +++ b/modules/core/arrow-core-data/src/test/kotlin/arrow/core/ListKTest.kt @@@ -23,7 -23,6 +23,10 @@@ import arrow.core.extensions.listk.trav import arrow.core.extensions.listk.unalign.unalign import arrow.core.extensions.listk.unzip.unzip import arrow.core.extensions.show ++<<<<<<< HEAD +import arrow.core.extensions.show ++======= ++>>>>>>> origin/master import arrow.test.UnitSpec import arrow.test.generators.genK import arrow.test.generators.listK diff --cc modules/fx/arrow-fx-mtl/src/main/kotlin/arrow/fx/mtl/eithert.kt index 5918c2481,06c4eb7c0..000000000 --- a/modules/fx/arrow-fx-mtl/src/main/kotlin/arrow/fx/mtl/eithert.kt +++ b/modules/fx/arrow-fx-mtl/src/main/kotlin/arrow/fx/mtl/eithert.kt @@@ -179,10 -179,11 +179,18 @@@ fun <F, L> EitherT.Companion.concurrent override fun CF(): Concurrent<F> = CF } ++<<<<<<< HEAD +interface EitherTMonadIO<F, L> : MonadIO<EitherTPartialOf<F, L>>, EitherTMonad<F, L> { + fun FIO(): MonadIO<F> + override fun MF(): Monad<F> = FIO() + override fun <A> IO<Nothing, A>.liftIO(): Kind<EitherTPartialOf<F, L>, A> = FIO().run { ++======= + @extension + interface EitherTMonadIO<F, L> : MonadIO<EitherTPartialOf<F, L>>, EitherTMonad<F, L> { + fun FIO(): MonadIO<F> + override fun MF(): Monad<F> = FIO() + override fun <A> IO<A>.liftIO(): Kind<EitherTPartialOf<F, L>, A> = FIO().run { ++>>>>>>> origin/master EitherT.liftF(this, liftIO()) } } diff --cc modules/fx/arrow-fx-mtl/src/main/kotlin/arrow/fx/mtl/kleisli.kt index 421acf5b5,eadd5ae48..000000000 --- a/modules/fx/arrow-fx-mtl/src/main/kotlin/arrow/fx/mtl/kleisli.kt +++ b/modules/fx/arrow-fx-mtl/src/main/kotlin/arrow/fx/mtl/kleisli.kt @@@ -156,10 -156,11 +156,18 @@@ fun <F, R> Kleisli.Companion.concurrent override fun CF(): Concurrent<F> = CF } ++<<<<<<< HEAD +interface KleisliMonadIO<F, R> : MonadIO<KleisliPartialOf<F, R>>, KleisliMonad<F, R> { + fun FIO(): MonadIO<F> + override fun MF(): Monad<F> = FIO() + override fun <A> IO<Nothing, A>.liftIO(): Kind<KleisliPartialOf<F, R>, A> = FIO().run { ++======= + @extension + interface KleisliMonadIO<F, R> : MonadIO<KleisliPartialOf<F, R>>, KleisliMonad<F, R> { + fun FIO(): MonadIO<F> + override fun MF(): Monad<F> = FIO() + override fun <A> IO<A>.liftIO(): Kind<KleisliPartialOf<F, R>, A> = FIO().run { ++>>>>>>> origin/master Kleisli.liftF(liftIO()) } } diff --cc modules/fx/arrow-fx-mtl/src/main/kotlin/arrow/fx/mtl/optiont.kt index 57a0131f8,11aac1dfa..000000000 --- a/modules/fx/arrow-fx-mtl/src/main/kotlin/arrow/fx/mtl/optiont.kt +++ b/modules/fx/arrow-fx-mtl/src/main/kotlin/arrow/fx/mtl/optiont.kt @@@ -164,10 -164,11 +164,18 @@@ fun <F> OptionT.Companion.concurrent(CF override fun CF(): Concurrent<F> = CF } ++<<<<<<< HEAD +interface OptionTMonadIO<F> : MonadIO<OptionTPartialOf<F>>, OptionTMonad<F> { + fun FIO(): MonadIO<F> + override fun MF(): Monad<F> = FIO() + override fun <A> IO<Nothing, A>.liftIO(): Kind<OptionTPartialOf<F>, A> = FIO().run { ++======= + @extension + interface OptionTMonadIO<F> : MonadIO<OptionTPartialOf<F>>, OptionTMonad<F> { + fun FIO(): MonadIO<F> + override fun MF(): Monad<F> = FIO() + override fun <A> IO<A>.liftIO(): Kind<OptionTPartialOf<F>, A> = FIO().run { ++>>>>>>> origin/master liftIO().liftT(this) } } diff --cc modules/fx/arrow-fx-mtl/src/main/kotlin/arrow/fx/mtl/statet.kt index 297999c61,54d52aa16..000000000 --- a/modules/fx/arrow-fx-mtl/src/main/kotlin/arrow/fx/mtl/statet.kt +++ b/modules/fx/arrow-fx-mtl/src/main/kotlin/arrow/fx/mtl/statet.kt @@@ -98,10 -98,11 +98,18 @@@ interface StateTAsyncInstane<F, S> : As } } ++<<<<<<< HEAD +interface StateTMonadIO<F, S> : MonadIO<StateTPartialOf<F, S>>, StateTMonad<F, S> { + fun FIO(): MonadIO<F> + override fun MF(): Monad<F> = FIO() + override fun <A> IO<Nothing, A>.liftIO(): Kind<StateTPartialOf<F, S>, A> = FIO().run { ++======= + @extension + interface StateTMonadIO<F, S> : MonadIO<StateTPartialOf<F, S>>, StateTMonad<F, S> { + fun FIO(): MonadIO<F> + override fun MF(): Monad<F> = FIO() + override fun <A> IO<A>.liftIO(): Kind<StateTPartialOf<F, S>, A> = FIO().run { ++>>>>>>> origin/master StateT.liftF(this, liftIO()) } } diff --cc modules/fx/arrow-fx-mtl/src/main/kotlin/arrow/fx/mtl/writert.kt index 28540234f,65df7af05..000000000 --- a/modules/fx/arrow-fx-mtl/src/main/kotlin/arrow/fx/mtl/writert.kt +++ b/modules/fx/arrow-fx-mtl/src/main/kotlin/arrow/fx/mtl/writert.kt @@@ -151,11 -151,12 +151,19 @@@ fun <F, W> WriterT.Companion.concurrent override fun MM(): Monoid<W> = MM } ++<<<<<<< HEAD ++======= + @extension ++>>>>>>> origin/master interface WriterTMonadIO<F, W> : MonadIO<WriterTPartialOf<F, W>>, WriterTMonad<F, W> { fun FIO(): MonadIO<F> override fun MF(): Monad<F> = FIO() override fun MM(): Monoid<W> ++<<<<<<< HEAD + override fun <A> IO<Nothing, A>.liftIO(): Kind<WriterTPartialOf<F, W>, A> = FIO().run { ++======= + override fun <A> IO<A>.liftIO(): Kind<WriterTPartialOf<F, W>, A> = FIO().run { ++>>>>>>> origin/master WriterT.liftF(liftIO(), MM(), this) } } diff --cc modules/fx/arrow-fx/src/main/kotlin/arrow/fx/IOParMap.kt index c1a47f667,6cafc9d24..000000000 --- a/modules/fx/arrow-fx/src/main/kotlin/arrow/fx/IOParMap.kt +++ b/modules/fx/arrow-fx/src/main/kotlin/arrow/fx/IOParMap.kt @@@ -2,6 -2,7 +2,10 @@@ package arrow.f import arrow.core.Either import arrow.core.Left ++<<<<<<< HEAD ++======= + import arrow.core.None ++>>>>>>> origin/master import arrow.core.Option import arrow.core.Right import arrow.core.Tuple2 @@@ -20,219 -21,186 +24,391 @@@ import arrow.core.internal.AtomicBoolea /** Mix-in to enable `parMapN` 2-arity on IO's companion directly. */ interface IOParMap { ++<<<<<<< HEAD + fun <EE, A, B, C> parMapN(fa: IOOf<EE, A>, fb: IOOf<EE, B>, f: (A, B) -> C): IO<EE, C> = + IO.parMapN(IODispatchers.CommonPool, fa, fb, f) + + fun <EE, A, B, C, D> parMapN(fa: IOOf<EE, A>, fb: IOOf<EE, B>, fc: IOOf<EE, C>, f: (A, B, C) -> D): IO<EE, D> = + IO.parMapN(IODispatchers.CommonPool, fa, fb, fc, f) + + fun <EE, A, B, C, D, E> parMapN(fa: IOOf<EE, A>, fb: IOOf<EE, B>, fc: IOOf<EE, C>, fd: IOOf<EE, D>, f: (A, B, C, D) -> E): IOOf<EE, E> = ++======= + fun <A, B, C> parMapN(fa: IOOf<A>, fb: IOOf<B>, f: (A, B) -> C): IO<C> = + IO.parMapN(IODispatchers.CommonPool, fa, fb, f) + + fun <A, B, C, D> parMapN(fa: IOOf<A>, fb: IOOf<B>, fc: IOOf<C>, f: (A, B, C) -> D): IO<D> = + IO.parMapN(IODispatchers.CommonPool, fa, fb, fc, f) + + fun <A, B, C, D, E> parMapN(fa: IOOf<A>, fb: IOOf<B>, fc: IOOf<C>, fd: IOOf<D>, f: (A, B, C, D) -> E): IOOf<E> = ++>>>>>>> origin/master IO.parMapN(IODispatchers.CommonPool, fa, fb, fc, fd, f) /** * @see parMapN */ ++<<<<<<< HEAD + fun <EE, A, B, C, D, E, G> parMapN(fa: IOOf<EE, A>, fb: IOOf<EE, B>, fc: IOOf<EE, C>, fd: IOOf<EE, D>, fe: IOOf<EE, E>, f: (A, B, C, D, E) -> G): IO<EE, G> = ++======= + fun <A, B, C, D, E, G> parMapN(fa: IOOf<A>, fb: IOOf<B>, fc: IOOf<C>, fd: IOOf<D>, fe: IOOf<E>, f: (A, B, C, D, E) -> G): IO<G> = ++>>>>>>> origin/master IO.parMapN(IODispatchers.CommonPool, fa, fb, fc, fd, fe, f) /** * @see parMapN */ ++<<<<<<< HEAD + fun <EE, A, B, C, D, E, G, H> parMapN(fa: IOOf<EE, A>, fb: IOOf<EE, B>, fc: IOOf<EE, C>, fd: IOOf<EE, D>, fe: IOOf<EE, E>, fg: IOOf<EE, G>, f: (A, B, C, D, E, G) -> H): IO<EE, H> = ++======= + fun <A, B, C, D, E, G, H> parMapN(fa: IOOf<A>, fb: IOOf<B>, fc: IOOf<C>, fd: IOOf<D>, fe: IOOf<E>, fg: IOOf<G>, f: (A, B, C, D, E, G) -> H): IO<H> = ++>>>>>>> origin/master IO.parMapN(IODispatchers.CommonPool, fa, fb, fc, fd, fe, fg, f) /** * @see parMapN */ ++<<<<<<< HEAD + fun <EE, A, B, C, D, E, G, H, I> parMapN(fa: IOOf<EE, A>, fb: IOOf<EE, B>, fc: IOOf<EE, C>, fd: IOOf<EE, D>, fe: IOOf<EE, E>, fg: IOOf<EE, G>, fh: IOOf<EE, H>, f: (A, B, C, D, E, G, H) -> I): IO<EE, I> = ++======= + fun <A, B, C, D, E, G, H, I> parMapN(fa: IOOf<A>, fb: IOOf<B>, fc: IOOf<C>, fd: IOOf<D>, fe: IOOf<E>, fg: IOOf<G>, fh: IOOf<H>, f: (A, B, C, D, E, G, H) -> I): IO<I> = ++>>>>>>> origin/master IO.parMapN(IODispatchers.CommonPool, fa, fb, fc, fd, fe, fg, fh, f) /** * @see parMapN */ ++<<<<<<< HEAD + fun <EE, A, B, C, D, E, G, H, I, J> parMapN(fa: IOOf<EE, A>, fb: IOOf<EE, B>, fc: IOOf<EE, C>, fd: IOOf<EE, D>, fe: IOOf<EE, E>, fg: IOOf<EE, G>, fh: IOOf<EE, H>, fi: IOOf<EE, I>, f: (A, B, C, D, E, G, H, I) -> J): IO<EE, J> = ++======= + fun <A, B, C, D, E, G, H, I, J> parMapN(fa: IOOf<A>, fb: IOOf<B>, fc: IOOf<C>, fd: IOOf<D>, fe: IOOf<E>, fg: IOOf<G>, fh: IOOf<H>, fi: IOOf<I>, f: (A, B, C, D, E, G, H, I) -> J): IO<J> = ++>>>>>>> origin/master IO.parMapN(IODispatchers.CommonPool, fa, fb, fc, fd, fe, fg, fh, fi, f) /** * @see parMapN */ ++<<<<<<< HEAD + fun <EE, A, B, C, D, E, G, H, I, J, K> parMapN(fa: IOOf<EE, A>, fb: IOOf<EE, B>, fc: IOOf<EE, C>, fd: IOOf<EE, D>, fe: IOOf<EE, E>, fg: IOOf<EE, G>, fh: IOOf<EE, H>, fi: IOOf<EE, I>, fj: IOOf<EE, J>, f: (A, B, C, D, E, G, H, I, J) -> K): IO<EE, K> = + IO.parMapN(IODispatchers.CommonPool, fa, fb, fc, fd, fe, fg, fh, fi, fj, f) + + fun <E, A, B, C> parMapN(ctx: CoroutineContext, fa: IOOf<E, A>, fb: IOOf<E, B>, f: (A, B) -> C): IO<E, C> = + IO.Async(true) { conn, cb -> + // Used to store Throwable, Either<A, B> or empty (null). (No sealed class used for a slightly better performing ParMap2) + val state = AtomicRefW<Any?>(null) + + val connA = IOConnection() + val connB = IOConnection() + + conn.pushPair(connA, connB) + + fun complete(a: A, b: B) { + conn.pop() + cb(try { + IOResult.Success(f(a, b)) + } catch (e: Throwable) { + IOResult.Exception(e.nonFatalOrThrow()) + }) + } + + fun sendException(other: IOConnection, e: Throwable) = when (state.getAndSet(e)) { + is Throwable -> Unit // Do nothing we already finished TODO replace with active field + else -> other.cancel().unsafeRunAsync { r -> + conn.pop() + // TODO if `r` is an exception send it to the asyncErrorHandler + cb(IOResult.Exception(r.fold({ e2 -> Platform.composeErrors(e, e2) }, { e }))) + } + } + + fun sendError(other: IOConnection, e: E) = when (state.getAndSet(e)) { + is Throwable -> Unit + else -> other.cancel().fix().unsafeRunAsync { r -> + conn.pop() + cb(IOResult.Error(e)) + // TODO if `r` is an exception send it to the asyncErrorHandler + } + } + + IORunLoop.startCancelable(IOForkedStart(fa, ctx), connA) { resultA -> + resultA.fold({ e -> + sendException(connB, e) + }, { e -> + sendError(connB, e) + }, { a -> + when (val oldState = state.getAndSet(Left(a))) { + null -> Unit // Wait for B + is Throwable -> Unit // ParMapN already failed and A was cancelled. + is Either.Left<*> -> Unit // Already state.getAndSet + is Either.Right<*> -> complete(a, (oldState as Either.Right<B>).b) + } + }) + } + + IORunLoop.startCancelable(IOForkedStart(fb, ctx), connB) { resultB -> + resultB.fold({ e -> + sendException(connA, e) + }, { e -> + sendError(connB, e) + }, { b -> + when (val oldState = state.getAndSet(Right(b))) { + null -> Unit // Wait for A + is Throwable -> Unit // ParMapN already failed and B was cancelled. + is Either.Right<*> -> Unit // IO cannot finish twice + is Either.Left<*> -> complete((oldState as Either.Left<A>).a, b) + } + }) + } + } + + fun <E, A, B, C, D> parMapN(ctx: CoroutineContext, fa: IOOf<E, A>, fb: IOOf<E, B>, fc: IOOf<E, C>, f: (A, B, C) -> D): IO<E, D> = + IO.Async(true) { conn, cb -> + + val state: AtomicRefW<Option<Tuple3<Option<A>, Option<B>, Option<C>>>> = AtomicRefW(none()) + val active = AtomicBooleanW(true) + + val connA = IOConnection() + val connB = IOConnection() + val connC = IOConnection() + + // Composite cancelable that cancels all ops. + // NOTE: conn.pop() called when cb gets called below in complete. + conn.push(connA.cancel(), connB.cancel(), connC.cancel()) + + fun complete(a: A, b: B, c: C) { + conn.pop() + val result: IOResult<E, D> = try { + IOResult.Success(f(a, b, c)) + } catch (e: Throwable) { + IOResult.Exception(e.nonFatalOrThrow()) + } + cb(result) + } + + fun tryComplete(result: Option<Tuple3<Option<A>, Option<B>, Option<C>>>): Unit = + result.fold({ Unit }, { (a, b, c) -> Option.applicative().map(a, b, c) { (a, b, c) -> complete(a, b, c) } }) + + fun sendException(other: IOConnection, other2: IOConnection, e: Throwable) = + if (active.getAndSet(false)) { // We were already cancelled so don't do anything. + other.cancel().unsafeRunAsync { r1 -> + other2.cancel().unsafeRunAsync { r2 -> + conn.pop() + cb(IOResult.Exception(r1.fold({ e2 -> + r2.fold({ e3 -> Platform.composeErrors(e, e2, e3) }, { Platform.composeErrors(e, e2) }) + }, { + r2.fold({ e3 -> Platform.composeErrors(e, e3) }, { e }) + }))) + } + } + } else Unit + + fun sendError(other: IOConnection, other2: IOConnection, e: E) = + if (active.getAndSet(false)) { // We were already cancelled so don't do anything. + other.cancel().fix().unsafeRunAsync { r1 -> + other2.cancel().fix().unsafeRunAsync { r2 -> + conn.pop() + // Send r1 & r2 to asyncErrorHandler if cancelation failed + cb(IOResult.Error(e)) + } + } + } else Unit + + IORunLoop.startCancelable(IOForkedStart(fa, ctx), connA) { resultA -> + resultA.fold({ e -> + sendException(connB, connC, e) + }, { e -> + sendError(connB, connC, e) + }, { a -> + tryComplete(state.updateAndGet { current -> + current + .map { it.copy(a = a.some()) } + .handleError { Tuple3(a.some(), none(), none()) } + }) + }) + } + + IORunLoop.startCancelable(IOForkedStart(fb, ctx), connB) { resultB -> + resultB.fold({ e -> + sendException(connA, connC, e) + }, { e -> + sendError(connB, connC, e) + }, { b -> + tryComplete(state.updateAndGet { current -> + current + .map { it.copy(b = b.some()) } + .handleError { Tuple3(none(), b.some(), none()) } + }) + }) + } + + IORunLoop.startCancelable(IOForkedStart(fc, ctx), connC) { resultC -> + resultC.fold({ e -> + sendException(connA, connB, e) + }, { e -> + sendError(connB, connC, e) + }, { c -> + tryComplete(state.updateAndGet { current -> + current + .map { it.copy(c = c.some()) } + .handleError { Tuple3(none(), none(), c.some()) } + }) + }) + } + } ++======= + fun <A, B, C, D, E, G, H, I, J, K> parMapN(fa: IOOf<A>, fb: IOOf<B>, fc: IOOf<C>, fd: IOOf<D>, fe: IOOf<E>, fg: IOOf<G>, fh: IOOf<H>, fi: IOOf<I>, fj: IOOf<J>, f: (A, B, C, D, E, G, H, I, J) -> K): IO<K> = + IO.parMapN(IODispatchers.CommonPool, fa, fb, fc, fd, fe, fg, fh, fi, fj, f) + + fun <A, B, C> parMapN(ctx: CoroutineContext, fa: IOOf<A>, fb: IOOf<B>, f: (A, B) -> C): IO<C> = IO.Async(true) { conn, cb -> + // Used to store Throwable, Either<A, B> or empty (null). (No sealed class used for a slightly better performing ParMap2) + val state = AtomicRefW<Any?>(null) + + val connA = IOConnection() + val connB = IOConnection() + + conn.pushPair(connA, connB) + + fun complete(a: A, b: B) { + conn.pop() + cb(try { + Either.Right(f(a, b)) + } catch (e: Throwable) { + Either.Left(e.nonFatalOrThrow()) + }) + } + + fun sendError(other: IOConnection, e: Throwable) = when (state.getAndSet(e)) { + is Throwable -> Unit // Do nothing we already finished + else -> other.cancel().fix().unsafeRunAsync { r -> + conn.pop() + cb(Left(r.fold({ e2 -> Platform.composeErrors(e, e2) }, { e }))) + } + } + + IORunLoop.startCancelable(IOForkedStart(fa, ctx), connA) { resultA -> + resultA.fold({ e -> + sendError(connB, e) + }, { a -> + when (val oldState = state.getAndSet(Left(a))) { + null -> Unit // Wait for B + is Throwable -> Unit // ParMapN already failed and A was cancelled. + is Either.Left<*> -> Unit // Already state.getAndSet + is Either.Right<*> -> complete(a, (oldState as Either.Right<B>).b) + } + }) + } + + IORunLoop.startCancelable(IOForkedStart(fb, ctx), connB) { resultB -> + resultB.fold({ e -> + sendError(connA, e) + }, { b -> + when (val oldState = state.getAndSet(Right(b))) { + null -> Unit // Wait for A + is Throwable -> Unit // ParMapN already failed and B was cancelled. + is Either.Right<*> -> Unit // IO cannot finish twice + is Either.Left<*> -> complete((oldState as Either.Left<A>).a, b) + } + }) + } + } + + fun <A, B, C, D> parMapN(ctx: CoroutineContext, fa: IOOf<A>, fb: IOOf<B>, fc: IOOf<C>, f: (A, B, C) -> D): IO<D> = IO.Async(true) { conn, cb -> + + val state: AtomicRefW<Option<Tuple3<Option<A>, Option<B>, Option<C>>>> = AtomicRefW(None) + val active = AtomicBooleanW(true) + + val connA = IOConnection() + val connB = IOConnection() + val connC = IOConnection() + + // Composite cancelable that cancels all ops. + // NOTE: conn.pop() called when cb gets called below in complete. + conn.push(connA.cancel(), connB.cancel(), connC.cancel()) + + fun complete(a: A, b: B, c: C) { + conn.pop() + val result: Either<Throwable, D> = try { + Either.Right(f(a, b, c)) + } catch (e: Throwable) { + Either.Left(e.nonFatalOrThrow()) + } + cb(result) + } + + fun tryComplete(result: Option<Tuple3<Option<A>, Option<B>, Option<C>>>): Unit = + result.fold({ Unit }, { (a, b, c) -> Option.applicative().map(a, b, c) { (a, b, c) -> complete(a, b, c) } }) + + fun sendError(other: IOConnection, other2: IOConnection, e: Throwable) = + if (active.getAndSet(false)) { // We were already cancelled so don't do anything. + other.cancel().fix().unsafeRunAsync { r1 -> + other2.cancel().fix().unsafeRunAsync { r2 -> + conn.pop() + cb(Left(r1.fold({ e2 -> + r2.fold({ e3 -> Platform.composeErrors(e, e2, e3) }, { Platform.composeErrors(e, e2) }) + }, { + r2.fold({ e3 -> Platform.composeErrors(e, e3) }, { e }) + }))) + } + } + } else Unit + + IORunLoop.startCancelable(IOForkedStart(fa, ctx), connA) { resultA -> + resultA.fold({ e -> + sendError(connB, connC, e) + }, { a -> + tryComplete(state.updateAndGet { current -> + current + .map { it.copy(a = a.some()) } + .handleError { Tuple3(a.some(), none(), none()) } + }) + }) + } + + IORunLoop.startCancelable(IOForkedStart(fb, ctx), connB) { resultB -> + resultB.fold({ e -> + sendError(connA, connC, e) + }, { b -> + tryComplete(state.updateAndGet { current -> + current + .map { it.copy(b = b.some()) } + .handleError { Tuple3(none(), b.some(), none()) } + }) + }) + } + + IORunLoop.startCancelable(IOForkedStart(fc, ctx), connC) { resultC -> + resultC.fold({ e -> + sendError(connA, connB, e) + }, { c -> + tryComplete(state.updateAndGet { current -> + current + .map { it.copy(c = c.some()) } + .handleError { Tuple3(none(), none(), c.some()) } + }) + }) + } + } ++>>>>>>> origin/master /** * @see parMapN */ ++<<<<<<< HEAD + fun <EE, A, B, C, D, E> parMapN( + ctx: CoroutineContext, + fa: IOOf<EE, A>, + fb: IOOf<EE, B>, + fc: IOOf<EE, C>, + fd: IOOf<EE, D>, + f: (A, B, C, D) -> E + ): IOOf<EE, E> = IO.parMapN(ctx, ++======= + fun <A, B, C, D, E> parMapN( + ctx: CoroutineContext, + fa: IOOf<A>, + fb: IOOf<B>, + fc: IOOf<C>, + fd: IOOf<D>, + f: (A, B, C, D) -> E + ): IOOf<E> = IO.parMapN(ctx, ++>>>>>>> origin/master IO.parMapN(ctx, fa, fb, ::Tuple2), IO.parMapN(ctx, fc, fd, ::Tuple2) ) { (a, b), (c, d) -> @@@ -242,15 -210,15 +418,27 @@@ /** * @see parMapN */ ++<<<<<<< HEAD + fun <EE, A, B, C, D, E, G> parMapN( + ctx: CoroutineContext, + fa: IOOf<EE, A>, + fb: IOOf<EE, B>, + fc: IOOf<EE, C>, + fd: IOOf<EE, D>, + fe: IOOf<EE, E>, + f: (A, B, C, D, E) -> G + ): IO<EE, G> = IO.parMapN(ctx, ++======= + fun <A, B, C, D, E, G> parMapN( + ctx: CoroutineContext, + fa: IOOf<A>, + fb: IOOf<B>, + fc: IOOf<C>, + fd: IOOf<D>, + fe: IOOf<E>, + f: (A, B, C, D, E) -> G + ): IO<G> = IO.parMapN(ctx, ++>>>>>>> origin/master IO.parMapN(ctx, fa, fb, fc, ::Tuple3), IO.parMapN(ctx, fd, fe, ::Tuple2) ) { (a, b, c), (d, e) -> @@@ -260,16 -228,16 +448,29 @@@ /** * @see parMapN */ ++<<<<<<< HEAD + fun <EE, A, B, C, D, E, G, H> parMapN( + ctx: CoroutineContext, + fa: IOOf<EE, A>, + fb: IOOf<EE, B>, + fc: IOOf<EE, C>, + fd: IOOf<EE, D>, + fe: IOOf<EE, E>, + fg: IOOf<EE, G>, + f: (A, B, C, D, E, G) -> H + ): IO<EE, H> = IO.parMapN(ctx, ++======= + fun <A, B, C, D, E, G, H> parMapN( + ctx: CoroutineContext, + fa: IOOf<A>, + fb: IOOf<B>, + fc: IOOf<C>, + fd: IOOf<D>, + fe: IOOf<E>, + fg: IOOf<G>, + f: (A, B, C, D, E, G) -> H + ): IO<H> = IO.parMapN(ctx, ++>>>>>>> origin/master IO.parMapN(ctx, fa, fb, fc, ::Tuple3), IO.parMapN(ctx, fd, fe, fg, ::Tuple3) ) { (a, b, c), (d, e, g) -> @@@ -279,17 -247,17 +480,31 @@@ /** * @see parMapN */ ++<<<<<<< HEAD + fun <EE, A, B, C, D, E, G, H, I> parMapN( + ctx: CoroutineContext, + fa: IOOf<EE, A>, + fb: IOOf<EE, B>, + fc: IOOf<EE, C>, + fd: IOOf<EE, D>, + fe: IOOf<EE, E>, + fg: IOOf<EE, G>, + fh: IOOf<EE, H>, + f: (A, B, C, D, E, G, H) -> I + ): IO<EE, I> = IO.parMapN(ctx, ++======= + fun <A, B, C, D, E, G, H, I> parMapN( + ctx: CoroutineContext, + fa: IOOf<A>, + fb: IOOf<B>, + fc: IOOf<C>, + fd: IOOf<D>, + fe: IOOf<E>, + fg: IOOf<G>, + fh: IOOf<H>, + f: (A, B, C, D, E, G, H) -> I + ): IO<I> = IO.parMapN(ctx, ++>>>>>>> origin/master IO.parMapN(ctx, fa, fb, fc, ::Tuple3), IO.parMapN(ctx, fd, fe, ::Tuple2), IO.parMapN(ctx, fg, fh, ::Tuple2)) { (a, b, c), (d, e), (g, h) -> @@@ -299,18 -267,18 +514,33 @@@ /** * @see parMapN */ ++<<<<<<< HEAD + fun <EE, A, B, C, D, E, G, H, I, J> parMapN( + ctx: CoroutineContext, + fa: IOOf<EE, A>, + fb: IOOf<EE, B>, + fc: IOOf<EE, C>, + fd: IOOf<EE, D>, + fe: IOOf<EE, E>, + fg: IOOf<EE, G>, + fh: IOOf<EE, H>, + fi: IOOf<EE, I>, + f: (A, B, C, D, E, G, H, I) -> J + ): IO<EE, J> = IO.parMapN(ctx, ++======= + fun <A, B, C, D, E, G, H, I, J> parMapN( + ctx: CoroutineContext, + fa: IOOf<A>, + fb: IOOf<B>, + fc: IOOf<C>, + fd: IOOf<D>, + fe: IOOf<E>, + fg: IOOf<G>, + fh: IOOf<H>, + fi: IOOf<I>, + f: (A, B, C, D, E, G, H, I) -> J + ): IO<J> = IO.parMapN(ctx, ++>>>>>>> origin/master IO.parMapN(ctx, fa, fb, fc, ::Tuple3), IO.parMapN(ctx, fd, fe, fg, ::Tuple3), IO.parMapN(ctx, fh, fi, ::Tuple2)) { (a, b, c), (d, e, g), (h, i) -> @@@ -320,19 -288,19 +550,35 @@@ /** * @see parMapN */ ++<<<<<<< HEAD + fun <EE, A, B, C, D, E, G, H, I, J, K> parMapN( + ctx: CoroutineContext, + fa: IOOf<EE, A>, + fb: IOOf<EE, B>, + fc: IOOf<EE, C>, + fd: IOOf<EE, D>, + fe: IOOf<EE, E>, + fg: IOOf<EE, G>, + fh: IOOf<EE, H>, + fi: IOOf<EE, I>, + fj: IOOf<EE, J>, + f: (A, B, C, D, E, G, H, I, J) -> K + ): IO<EE, K> = IO.parMapN(ctx, ++======= + fun <A, B, C, D, E, G, H, I, J, K> parMapN( + ctx: CoroutineContext, + fa: IOOf<A>, + fb: IOOf<B>, + fc: IOOf<C>, + fd: IOOf<D>, + fe: IOOf<E>, + fg: IOOf<G>, + fh: IOOf<H>, + fi: IOOf<I>, + fj: IOOf<J>, + f: (A, B, C, D, E, G, H, I, J) -> K + ): IO<K> = IO.parMapN(ctx, ++>>>>>>> origin/master IO.parMapN(ctx, fa, fb, fc, ::Tuple3), IO.parMapN(ctx, fd, fe, fg, ::Tuple3), IO.parMapN(ctx, fh, fi, fj, ::Tuple3)) { (a, b, c), (d, e, g), (h, i, j) -> diff --cc modules/fx/arrow-fx/src/main/kotlin/arrow/fx/IORace.kt index 989c4e24d,ceb74da6c..000000000 --- a/modules/fx/arrow-fx/src/main/kotlin/arrow/fx/IORace.kt +++ b/modules/fx/arrow-fx/src/main/kotlin/arrow/fx/IORace.kt @@@ -1,5 -1,6 +1,9 @@@ package arrow.fx ++<<<<<<< HEAD ++======= + import arrow.core.Either ++>>>>>>> origin/master import arrow.core.Left import arrow.core.Right import arrow.core.internal.AtomicBooleanW @@@ -14,28 -15,28 +18,53 @@@ import kotlin.coroutines.CoroutineConte interface IORace { ++<<<<<<< HEAD + fun <EE, A, B> raceN(ioA: IOOf<EE, A>, ioB: IOOf<EE, B>): IO<EE, Race2<A, B>> = + IO.raceN(IODispatchers.CommonPool, ioA, ioB) + + fun <EE, A, B, C> raceN(ioA: IOOf<EE, A>, ioB: IOOf<EE, B>, ioC: IOOf<EE, C>): IO<EE, Race3<out A, out B, out C>> = + IO.raceN(IODispatchers.CommonPool, ioA, ioB, ioC) + + fun <EE, A, B, C, D> raceN(ioA: IOOf<EE, A>, ioB: IOOf<EE, B>, ioC: IOOf<EE, C>, ioD: IOOf<EE, D>): IO<EE, Race4<out A, out B, out C, out D>> = + IO.raceN(IODispatchers.CommonPool, ioA, ioB, ioC, ioD) + + fun <EE, A, B, C, D, E> raceN(ioA: IOOf<EE, A>, ioB: IOOf<EE, B>, ioC: IOOf<EE, C>, ioD: IOOf<EE, D>, ioE: IOOf<EE, E>): IO<EE, Race5<out A, out B, out C, out D, out E>> = + IO.raceN(IODispatchers.CommonPool, ioA, ioB, ioC, ioD, ioE) + + fun <EE, A, B, C, D, E, F> raceN(ioA: IOOf<EE, A>, ioB: IOOf<EE, B>, ioC: IOOf<EE, C>, ioD: IOOf<EE, D>, ioE: IOOf<EE, E>, ioF: IOOf<EE, F>): IO<EE, Race6<out A, out B, out C, out D, out E, out F>> = + IO.raceN(IODispatchers.CommonPool, ioA, ioB, ioC, ioD, ioE, ioF) + + fun <EE, A, B, C, D, E, F, G> raceN(ioA: IOOf<EE, A>, ioB: IOOf<EE, B>, ioC: IOOf<EE, C>, ioD: IOOf<EE, D>, ioE: IOOf<EE, E>, ioF: IOOf<EE, F>, ioG: IOOf<EE, G>): IO<EE, Race7<out A, out B, out C, out D, out E, out F, out G>> = + IO.raceN(IODispatchers.CommonPool, ioA, ioB, ioC, ioD, ioE, ioF, ioG) + + fun <EE, A, B, C, D, E, F, G, H> raceN(ioA: IOOf<EE, A>, ioB: IOOf<EE, B>, ioC: IOOf<EE, C>, ioD: IOOf<EE, D>, ioE: IOOf<EE, E>, ioF: IOOf<EE, F>, ioG: IOOf<EE, G>, ioH: IOOf<EE, H>): IO<EE, Race8<out A, out B, out C, out D, out E, out F, out G, out H>> = + IO.raceN(IODispatchers.CommonPool, ioA, ioB, ioC, ioD, ioE, ioF, ioG, ioH) + + fun <EE, A, B, C, D, E, F, G, H, I> raceN(ioA: IOOf<EE, A>, ioB: IOOf<EE, B>, ioC: IOOf<EE, C>, ioD: IOOf<EE, D>, ioE: IOOf<EE, E>, ioF: IOOf<EE, F>, ioG: IOOf<EE, G>, ioH: IOOf<EE, H>, ioI: IOOf<EE, I>): IO<EE, Race9<out A, out B, out C, out D, out E, out F, out G, out H, out I>> = ++======= + fun <A, B> raceN(ioA: IOOf<A>, ioB: IOOf<B>): IO<Race2<A, B>> = + IO.raceN(IODispatchers.CommonPool, ioA, ioB) + + fun <A, B, C> raceN(ioA: IOOf<A>, ioB: IOOf<B>, ioC: IOOf<C>): IO<Race3<out A, out B, out C>> = + IO.raceN(IODispatchers.CommonPool, ioA, ioB, ioC) + + fun <A, B, C, D> raceN(ioA: IOOf<A>, ioB: IOOf<B>, ioC: IOOf<C>, ioD: IOOf<D>): IO<Race4<out A, out B, out C, out D>> = + IO.raceN(IODispatchers.CommonPool, ioA, ioB, ioC, ioD) + + fun <A, B, C, D, E> raceN(ioA: IOOf<A>, ioB: IOOf<B>, ioC: IOOf<C>, ioD: IOOf<D>, ioE: IOOf<E>): IO<Race5<out A, out B, out C, out D, out E>> = + IO.raceN(IODispatchers.CommonPool, ioA, ioB, ioC, ioD, ioE) + + fun <A, B, C, D, E, F> raceN(ioA: IOOf<A>, ioB: IOOf<B>, ioC: IOOf<C>, ioD: IOOf<D>, ioE: IOOf<E>, ioF: IOOf<F>): IO<Race6<out A, out B, out C, out D, out E, out F>> = + IO.raceN(IODispatchers.CommonPool, ioA, ioB, ioC, ioD, ioE, ioF) + + fun <A, B, C, D, E, F, G> raceN(ioA: IOOf<A>, ioB: IOOf<B>, ioC: IOOf<C>, ioD: IOOf<D>, ioE: IOOf<E>, ioF: IOOf<F>, ioG: IOOf<G>): IO<Race7<out A, out B, out C, out D, out E, out F, out G>> = + IO.raceN(IODispatchers.CommonPool, ioA, ioB, ioC, ioD, ioE, ioF, ioG) + + fun <A, B, C, D, E, F, G, H> raceN(ioA: IOOf<A>, ioB: IOOf<B>, ioC: IOOf<C>, ioD: IOOf<D>, ioE: IOOf<E>, ioF: IOOf<F>, ioG: IOOf<G>, ioH: IOOf<H>): IO<Race8<out A, out B, out C, out D, out E, out F, out G, out H>> = + IO.raceN(IODispatchers.CommonPool, ioA, ioB, ioC, ioD, ioE, ioF, ioG, ioH) + + fun <A, B, C, D, E, F, G, H, I> raceN(ioA: IOOf<A>, ioB: IOOf<B>, ioC: IOOf<C>, ioD: IOOf<D>, ioE: IOOf<E>, ioF: IOOf<F>, ioG: IOOf<G>, ioH: IOOf<H>, ioI: IOOf<I>): IO<Race9<out A, out B, out C, out D, out E, out F, out G, out H, out I>> = ++>>>>>>> origin/master IO.raceN(IODispatchers.CommonPool, ioA, ioB, ioC, ioD, ioE, ioF, ioG, ioH, ioI) /** @@@ -48,13 -49,13 +77,23 @@@ * import kotlinx.coroutines.Dispatchers * * fun main(args: Array<String>) { ++<<<<<<< HEAD + * //sampleStart + * val result = IO.fx { + * val racePair = !IO.racePair(Dispatchers.Default, never<Int>(), just("Hello World!")) + * racePair.fold( + * { _, _ -> "never cannot win race" }, + * { _, winner -> winner } + * ) ++======= + * //sampleStart + * val result = IO.fx { + * val racePair = !IO.racePair(Dispatchers.Default, never<Int>(), just("Hello World!")) + * racePair.fold( + * { _, _ -> "never cannot win race" }, + * { _, winner -> winner } + * ) ++>>>>>>> origin/master * } * //sampleEnd * @@@ -71,7 -72,7 +110,11 @@@ * * @see [arrow.fx.typeclasses.Concurrent.raceN] for a simpler version that cancels loser. */ ++<<<<<<< HEAD + fun <E, A, B> racePair(ctx: CoroutineContext, ioA: IOOf<E, A>, ioB: IOOf<E, B>): IO<E, RacePair<IOPartialOf<E>, A, B>> = ++======= + fun <A, B> racePair(ctx: CoroutineContext, ioA: IOOf<A>, ioB: IOOf<B>): IO<RacePair<ForIO, A, B>> = ++>>>>>>> origin/master IO.Async(true) { conn, cb -> val active = AtomicBooleanW(true) @@@ -80,71 -81,51 +123,115 @@@ // Cancelable connection for the left value val connA = IOConnection() connA.push(upstreamCancelToken) ++<<<<<<< HEAD + val promiseA = UnsafePromise<E, A>() ++======= + val promiseA = UnsafePromise<A>() ++>>>>>>> origin/master // Cancelable connection for the right value val connB = IOConnection() connB.push(upstreamCancelToken) ++<<<<<<< HEAD + val promiseB = UnsafePromise<E, B>() + + conn.pushPair(connA, connB) + + IORunLoop.startCancelable(IOForkedStart(ioA, ctx), connA) { either: IOResult<E, A> -> + either.fold({ error -> + if (active.getAndSet(false)) { // if an error finishes first, stop the race. + connB.cancel().unsafeRunAsync { r2 -> + conn.pop() + cb(IOResult.Exception(r2.fold({ Platform.composeErrors(error, it) }, { error }))) + } + } else { + promiseA.complete(IOResult.Exception(error)) + } + }, { e -> + if (active.getAndSet(false)) { // if an error finishes first, stop the race. + connB.cancel().unsafeRunAsync { r2 -> + conn.pop() + // TODO asyncErrorHandler r2 + cb(IOResult.Error(e)) + } + } else { + promiseA.complete(IOResult.Error(e)) ++======= + val promiseB = UnsafePromise<B>() + + conn.pushPair(connA, connB) + + IORunLoop.startCancelable(IOForkedStart(ioA, ctx), connA) { either: Either<Throwable, A> -> + either.fold({ error -> + if (active.getAndSet(false)) { // if an error finishes first, stop the race. + connB.cancel().fix().unsafeRunAsync { r2 -> + conn.pop() + cb(Left(r2.fold({ Platform.composeErrors(error, it) }, { error }))) + } + } else { + promiseA.complete(Left(error)) ++>>>>>>> origin/master } }, { a -> if (active.getAndSet(false)) { conn.pop() ++<<<<<<< HEAD + cb(IOResult.Success(RacePair.First(a, IOFiber(promiseB, connB)))) + } else { + promiseA.complete(IOResult.Success(a)) ++======= + cb(Right(RacePair.First(a, IOFiber(promiseB, connB)))) + } else { + promiseA.complete(Right(a)) ++>>>>>>> origin/master } }) } ++<<<<<<< HEAD + IORunLoop.startCancelable(IOForkedStart(ioB, ctx), connB) { either: IOResult<E, B> -> + either.fold({ error -> + if (active.getAndSet(false)) { // if an error finishes first, stop the race. + connA.cancel().unsafeRunAsync { r2 -> + conn.pop() + cb(IOResult.Exception(r2.fold({ Platform.composeErrors(error, it) }, { error }))) + } + } else { + promiseB.complete(IOResult.Exception(error)) + } + }, { e -> + if (active.getAndSet(false)) { // if an error finishes first, stop the race. + connB.cancel().unsafeRunAsync { r2 -> + conn.pop() + // TODO asyncErrorHandler r2 + cb(IOResult.Error(e)) + } + } else { + promiseB.complete(IOResult.Error(e)) ++======= + IORunLoop.startCancelable(IOForkedStart(ioB, ctx), connB) { either: Either<Throwable, B> -> + either.fold({ error -> + if (active.getAndSet(false)) { // if an error finishes first, stop the race. + connA.cancel().fix().unsafeRunAsync { r2 -> + conn.pop() + cb(Left(r2.fold({ Platform.composeErrors(error, it) }, { error }))) + } + } else { + promiseB.complete(Left(error)) ++>>>>>>> origin/master } }, { b -> if (active.getAndSet(false)) { conn.pop() ++<<<<<<< HEAD + cb(IOResult.Success(RacePair.Second(IOFiber(promiseA, connA), b))) + } else { + promiseB.complete(IOResult.Success(b)) ++======= + cb(Right(RacePair.Second(IOFiber(promiseA, connA), b))) + } else { + promiseB.complete(Right(b)) ++>>>>>>> origin/master } }) } @@@ -184,7 -165,7 +271,11 @@@ * * @see [arrow.fx.typeclasses.Concurrent.raceN] for a simpler version that cancels losers. */ ++<<<<<<< HEAD + fun <E, A, B, C> raceTriple(ctx: CoroutineContext, ioA: IOOf<E, A>, ioB: IOOf<E, B>, ioC: IOOf<E, C>): IO<E, RaceTriple<IOPartialOf<E>, A, B, C>> = ++======= + fun <A, B, C> raceTriple(ctx: CoroutineContext, ioA: IOOf<A>, ioB: IOOf<B>, ioC: IOOf<C>): IO<RaceTriple<ForIO, A, B, C>> = ++>>>>>>> origin/master IO.Async(true) { conn, cb -> val active = AtomicBooleanW(true) @@@ -192,129 -173,95 +283,207 @@@ val connA = IOConnection() connA.push(upstreamCancelToken) ++<<<<<<< HEAD + val promiseA = UnsafePromise<E, A>() + + val connB = IOConnection() + connB.push(upstreamCancelToken) + val promiseB = UnsafePromise<E, B>() + + val connC = IOConnection() + connC.push(upstreamCancelToken) + val promiseC = UnsafePromise<E, C>() + + conn.push(connA.cancel(), connB.cancel(), connC.cancel()) + + IORunLoop.startCancelable(IOForkedStart(ioA, ctx), connA) { either: IOResult<E, A> -> + either.fold({ error -> + if (active.getAndSet(false)) { // if an error finishes first, stop the race. + connB.cancel().unsafeRunAsync { r2 -> + connC.cancel().unsafeRunAsync { r3 -> ++======= + val promiseA = UnsafePromise<A>() + + val connB = IOConnection() + connB.push(upstreamCancelToken) + val promiseB = UnsafePromise<B>() + + val connC = IOConnection() + connC.push(upstreamCancelToken) + val promiseC = UnsafePromise<C>() + + conn.push(connA.cancel(), connB.cancel(), connC.cancel()) + + IORunLoop.startCancelable(IOForkedStart(ioA, ctx), connA) { either: Either<Throwable, A> -> + either.fold({ error -> + if (active.getAndSet(false)) { // if an error finishes first, stop the race. + connB.cancel().fix().unsafeRunAsync { r2 -> + connC.cancel().fix().unsafeRunAsync { r3 -> ++>>>>>>> origin/master conn.pop() val errorResult = r2.fold({ e2 -> r3.fold({ e3 -> Platform.composeErrors(error, e2, e3) }, { Platform.composeErrors(error, e2) }) }, { r3.fold({ e3 -> Platform.composeErrors(error, e3) }, { error }) }) ++<<<<<<< HEAD + cb(IOResult.Exception(errorResult)) + } + } + } else { + promiseA.complete(IOResult.Exception(error)) + } + }, { e -> + if (active.getAndSet(false)) { // if an error finishes first, stop the race. + connB.cancel().unsafeRunAsync { r2 -> + connC.cancel().unsafeRunAsync { r3 -> + conn.pop() + cb(IOResult.Error(e)) + } + } + } else { + promiseA.complete(IOResult.Error(e)) ++======= + cb(Left(errorResult)) + } + } + } else { + promiseA.complete(Left(error)) ++>>>>>>> origin/master } }, { a -> if (active.getAndSet(false)) { conn.pop() ++<<<<<<< HEAD + cb(IOResult.Success(RaceTriple.First(a, IOFiber(promiseB, connB), IOFiber(promiseC, connC)))) + } else { + promiseA.complete(IOResult.Success(a)) ++======= + cb(Right(RaceTriple.First(a, IOFiber(promiseB, connB), IOFiber(promiseC, connC)))) + } else { + promiseA.complete(Right(a)) ++>>>>>>> origin/master } }) } ++<<<<<<< HEAD + IORunLoop.startCancelable(IOForkedStart(ioB, ctx), connB) { either: IOResult<E, B> -> + either.fold({ error -> + if (active.getAndSet(false)) { // if an error finishes first, stop the race. + connA.cancel().unsafeRunAsync { r2 -> + connC.cancel().unsafeRunAsync { r3 -> ++======= + IORunLoop.startCancelable(IOForkedStart(ioB, ctx), connB) { either: Either<Throwable, B> -> + either.fold({ error -> + if (active.getAndSet(false)) { // if an error finishes first, stop the race. + connA.cancel().fix().unsafeRunAsync { r2 -> + connC.cancel().fix().unsafeRunAsync { r3 -> ++>>>>>>> origin/master conn.pop() val errorResult = r2.fold({ e2 -> r3.fold({ e3 -> Platform.composeErrors(error, e2, e3) }, { Platform.composeErrors(error, e2) }) }, { r3.fold({ e3 -> Platform.composeErrors(error, e3) }, { error }) }) ++<<<<<<< HEAD + cb(IOResult.Exception(errorResult)) + } + } + } else { + promiseB.complete(IOResult.Exception(error)) + } + }, { e -> + if (active.getAndSet(false)) { // if an error finishes first, stop the race. + connA.cancel().unsafeRunAsync { r2 -> + connC.cancel().unsafeRunAsync { r3 -> + conn.pop() + cb(IOResult.Error(e)) + } + } + } else { + promiseB.complete(IOResult.Error(e)) ++======= + cb(Left(errorResult)) + } + } + } else { + promiseB.complete(Left(error)) ++>>>>>>> origin/master } }, { b -> if (active.getAndSet(false)) { conn.pop() ++<<<<<<< HEAD + cb(IOResult.Success(RaceTriple.Second(IOFiber(promiseA, connA), b, IOFiber(promiseC, connC)))) + } else { + promiseB.complete(IOResult.Success(b)) ++======= + cb(Right(RaceTriple.Second(IOFiber(promiseA, connA), b, IOFiber(promiseC, connC)))) + } else { + promiseB.complete(Right(b)) ++>>>>>>> origin/master } }) } ++<<<<<<< HEAD + IORunLoop.startCancelable(IOForkedStart(ioC, ctx), connC) { either: IOResult<E, C> -> + either.fold({ error -> + if (active.getAndSet(false)) { // if an error finishes first, stop the race. + connA.cancel().unsafeRunAsync { r2 -> + connB.cancel().unsafeRunAsync { r3 -> ++======= + IORunLoop.startCancelable(IOForkedStart(ioC, ctx), connC) { either: Either<Throwable, C> -> + either.fold({ error -> + if (active.getAndSet(false)) { // if an error finishes first, stop the race. + connA.cancel().fix().unsafeRunAsync { r2 -> + connB.cancel().fix().unsafeRunAsync { r3 -> ++>>>>>>> origin/master conn.pop() val errorResult = r2.fold({ e2 -> r3.fold({ e3 -> Platform.composeErrors(error, e2, e3) }, { Platform.composeErrors(error, e2) }) }, { r3.fold({ e3 -> Platform.composeErrors(error, e3) }, { error }) }) ++<<<<<<< HEAD + cb(IOResult.Exception(errorResult)) + } + } + } else { + promiseC.complete(IOResult.Exception(error)) + } + }, { e -> + if (active.getAndSet(false)) { // if an error finishes first, stop the race. + connA.cancel().unsafeRunAsync { r2 -> + connB.cancel().unsafeRunAsync { r3 -> + conn.pop() + // + cb(IOResult.Error(e)) + } + } + } else { + promiseC.complete(IOResult.Error(e)) ++======= + cb(Left(errorResult)) + } + } + } else { + promiseC.complete(Left(error)) ++>>>>>>> origin/master } }, { c -> if (active.getAndSet(false)) { conn.pop() ++<<<<<<< HEAD + cb(IOResult.Success(RaceTriple.Third(IOFiber(promiseA, connA), IOFiber(promiseB, connB), c))) + } else { + promiseC.complete(IOResult.Success(c)) ++======= + cb(Right(RaceTriple.Third(IOFiber(promiseA, connA), IOFiber(promiseB, connB), c))) + } else { + promiseC.complete(Right(c)) ++>>>>>>> origin/master } }) } @@@ -359,11 -306,11 +528,19 @@@ * * @see racePair for a version that does not automatically cancel the loser. */ ++<<<<<<< HEAD + fun <EE, A, B> raceN( + ctx: CoroutineContext, + ioA: IOOf<EE, A>, + ioB: IOOf<EE, B> + ): IO<EE, Race2<A, B>> = ++======= + fun <A, B> raceN( + ctx: CoroutineContext, + ioA: IOOf<A>, + ioB: IOOf<B> + ): IO<Race2<A, B>> = ++>>>>>>> origin/master racePair(ctx, ioA, ioB) .flatMap { it.fold( @@@ -375,12 -322,12 +552,21 @@@ /** * @see raceN */ ++<<<<<<< HEAD + fun <EE, A, B, C> raceN( + ctx: CoroutineContext, + ioA: IOOf<EE, A>, + ioB: IOOf<EE, B>, + ioC: IOOf<EE, C> + ): IO<EE, Race3<out A, out B, out C>> = ++======= + fun <A, B, C> raceN( + ctx: CoroutineContext, + ioA: IOOf<A>, + ioB: IOOf<B>, + ioC: IOOf<C> + ): IO<Race3<out A, out B, out C>> = ++>>>>>>> origin/master raceTriple(ctx, ioA, ioB, ioC) .flatMap { it.fold( @@@ -393,13 -340,13 +579,23 @@@ /** * @see raceN */ ++<<<<<<< HEAD + fun <EE, A, B, C, D> raceN( + ctx: CoroutineContext, + ioA: IOOf<EE, A>, + ioB: IOOf<EE, B>, + ioC: IOOf<EE, C>, + ioD: IOOf<EE, D> + ): IO<EE, Race4<out A, out B, out C, out D>> = ++======= + fun <A, B, C, D> raceN( + ctx: CoroutineContext, + ioA: IOOf<A>, + ioB: IOOf<B>, + ioC: IOOf<C>, + ioD: IOOf<D> + ): IO<Race4<out A, out B, out C, out D>> = ++>>>>>>> origin/master raceN(ctx, raceN(ctx, ioA, ioB), raceN(ctx, ioC, ioD) @@@ -413,14 -360,14 +609,25 @@@ /** * @see raceN */ ++<<<<<<< HEAD + fun <EE, A, B, C, D, E> raceN( + ctx: CoroutineContext, + ioA: IOOf<EE, A>, + ioB: IOOf<EE, B>, + ioC: IOOf<EE, C>, + ioD: IOOf<EE, D>, + ioE: IOOf<EE, E> + ): IO<EE, Race5<out A, out B, out C, out D, out E>> = ++======= + fun <A, B, C, D, E> raceN( + ctx: CoroutineContext, + ioA: IOOf<A>, + ioB: IOOf<B>, + ioC: IOOf<C>, + ioD: IOOf<D>, + ioE: IOOf<E> + ): IO<Race5<out A, out B, out C, out D, out E>> = ++>>>>>>> origin/master raceN(ctx, raceN(ctx, ioA, ioB, ioC), raceN(ctx, ioD, ioE) @@@ -434,15 -381,15 +641,27 @@@ /** * @see raceN */ ++<<<<<<< HEAD + fun <EE, A, B, C, D, E, F> raceN( + ctx: CoroutineContext, + ioA: IOOf<EE, A>, + ioB: IOOf<EE, B>, + ioC: IOOf<EE, C>, + ioD: IOOf<EE, D>, + ioE: IOOf<EE, E>, + ioF: IOOf<EE, F> + ): IO<EE, Race6<out A, out B, out C, out D, out E, out F>> = ++======= + fun <A, B, C, D, E, F> raceN( + ctx: CoroutineContext, + ioA: IOOf<A>, + ioB: IOOf<B>, + ioC: IOOf<C>, + ioD: IOOf<D>, + ioE: IOOf<E>, + ioF: IOOf<F> + ): IO<Race6<out A, out B, out C, out D, out E, out F>> = ++>>>>>>> origin/master raceN(ctx, raceN(ctx, ioA, ioB, ioC), raceN(ctx, ioD, ioE, ioF) @@@ -456,16 -403,16 +675,29 @@@ /** * @see raceN */ ++<<<<<<< HEAD + fun <EE, A, B, C, D, E, F, G> raceN( + ctx: CoroutineContext, + ioA: IOOf<EE, A>, + ioB: IOOf<EE, B>, + ioC: IOOf<EE, C>, + ioD: IOOf<EE, D>, + ioE: IOOf<EE, E>, + ioF: IOOf<EE, F>, + ioG: IOOf<EE, G> + ): IO<EE, Race7<out A, out B, out C, out D, out E, out F, out G>> = ++======= + fun <A, B, C, D, E, F, G> raceN( + ctx: CoroutineContext, + ioA: IOOf<A>, + ioB: IOOf<B>, + ioC: IOOf<C>, + ioD: IOOf<D>, + ioE: IOOf<E>, + ioF: IOOf<F>, + ioG: IOOf<G> + ): IO<Race7<out A, out B, out C, out D, out E, out F, out G>> = ++>>>>>>> origin/master raceN(ctx, raceN(ctx, ioA, ioB, ioC), raceN(ctx, ioD, ioE), @@@ -481,17 -428,17 +713,31 @@@ /** * @see raceN */ ++<<<<<<< HEAD + fun <EE, A, B, C, D, E, F, G, H> raceN( + ctx: CoroutineContext, + ioA: IOOf<EE, A>, + ioB: IOOf<EE, B>, + ioC: IOOf<EE, C>, + ioD: IOOf<EE, D>, + ioE: IOOf<EE, E>, + ioF: IOOf<EE, F>, + ioG: IOOf<EE, G>, + ioH: IOOf<EE, H> + ): IO<EE, Race8<out A, out B, out C, out D, out E, out F, out G, out H>> = ++======= + fun <A, B, C, D, E, F, G, H> raceN( + ctx: CoroutineContext, + ioA: IOOf<A>, + ioB: IOOf<B>, + ioC: IOOf<C>, + ioD: IOOf<D>, + ioE: IOOf<E>, + ioF: IOOf<F>, + ioG: IOOf<G>, + ioH: IOOf<H> + ): IO<Race8<out A, out B, out C, out D, out E, out F, out G, out H>> = ++>>>>>>> origin/master raceN(ctx, raceN(ctx, ioA, ioB, ioC), raceN(ctx, ioD, ioE, ioF), @@@ -507,18 -454,18 +753,33 @@@ /** * @see raceN */ ++<<<<<<< HEAD + fun <EE, A, B, C, D, E, F, G, H, I> raceN( + ctx: CoroutineContext, + ioA: IOOf<EE, A>, + ioB: IOOf<EE, B>, + ioC: IOOf<EE, C>, + ioD: IOOf<EE, D>, + ioE: IOOf<EE, E>, + ioF: IOOf<EE, F>, + ioG: IOOf<EE, G>, + ioH: IOOf<EE, H>, + ioI: IOOf<EE, I> + ): IO<EE, Race9<out A, out B, out C, out D, out E, out F, out G, out H, out I>> = ++======= + fun <A, B, C, D, E, F, G, H, I> raceN( + ctx: CoroutineContext, + ioA: IOOf<A>, + ioB: IOOf<B>, + ioC: IOOf<C>, + ioD: IOOf<D>, + ioE: IOOf<E>, + ioF: IOOf<F>, + ioG: IOOf<G>, + ioH: IOOf<H>, + ioI: IOOf<I> + ): IO<Race9<out A, out B, out C, out D, out E, out F, out G, out H, out I>> = ++>>>>>>> origin/master raceN(ctx, raceN(ctx, ioA, ioB, ioC), raceN(ctx, ioD, ioE, ioF), diff --cc modules/fx/arrow-fx/src/main/kotlin/arrow/fx/extensions/Resource.kt index 8852ca489,914e8cf15..000000000 --- a/modules/fx/arrow-fx/src/main/kotlin/arrow/fx/extensions/Resource.kt +++ b/modules/fx/arrow-fx/src/main/kotlin/arrow/fx/extensions/Resource.kt @@@ -79,10 -79,11 +79,18 @@@ interface ResourceMonoid<F, E, A> : Mon override fun empty(): Resource<F, E, A> = Resource.empty(MR(), BR()) } ++<<<<<<< HEAD +interface ResourceMonadIO<F, E> : MonadIO<ResourcePartialOf<F, E>>, ResourceMonad<F, E> { + fun FIO(): MonadIO<F> + override fun BR(): Bracket<F, E> + override fun <A> IO<Nothing, A>.liftIO(): Kind<ResourcePartialOf<F, E>, A> = FIO().run { ++======= + @extension + interface ResourceMonadIO<F, E> : MonadIO<ResourcePartialOf<F, E>>, ResourceMonad<F, E> { + fun FIO(): MonadIO<F> + override fun BR(): Bracket<F, E> + override fun <A> IO<A>.liftIO(): Kind<ResourcePartialOf<F, E>, A> = FIO().run { ++>>>>>>> origin/master Resource.run { liftIO().liftF(BR()) } } } diff --cc modules/fx/arrow-fx/src/main/kotlin/arrow/fx/extensions/io.kt index 7029b996c,de236893c..000000000 --- a/modules/fx/arrow-fx/src/main/kotlin/arrow/fx/extensions/io.kt +++ b/modules/fx/arrow-fx/src/main/kotlin/arrow/fx/extensions/io.kt @@@ -268,62 -252,36 +268,72 @@@ interface IOMonoid<E, A> : Monoid<IO<E fun SM(): Monoid<A> - override fun empty(): IO<A> = IO.just(SM().empty()) + override fun empty(): IO<E, A> = IO.just(SM().empty()) } ++<<<<<<< HEAD +interface IOUnsafeRun : UnsafeRun<IOPartialOf<Nothing>> { ++======= + @extension + interface IOMonadIO : MonadIO<ForIO>, IOMonad { + override fun <A> IO<A>.liftIO(): Kind<ForIO, A> = this + } + + @extension + interface IOUnsafeRun : UnsafeRun<ForIO> { ++>>>>>>> origin/master - override suspend fun <A> unsafe.runBlocking(fa: () -> Kind<ForIO, A>): A = fa().fix().unsafeRunSync() + override suspend fun <A> unsafe.runBlocking(fa: () -> Kind<IOPartialOf<Nothing>, A>): A = + fa().unsafeRunSync() - override suspend fun <A> unsafe.runNonBlocking(fa: () -> Kind<ForIO, A>, cb: (Either<Throwable, A>) -> Unit): Unit = - fa().fix().unsafeRunAsync(cb) + override suspend fun <A> unsafe.runNonBlocking(fa: () -> Kind<IOPartialOf<Nothing>, A>, cb: (Either<Throwable, A>) -> Unit): Unit = + fa().unsafeRunAsync(cb) } -@extension -interface IOUnsafeCancellableRun : UnsafeCancellableRun<ForIO> { - override suspend fun <A> unsafe.runBlocking(fa: () -> Kind<ForIO, A>): A = fa().fix().unsafeRunSync() +interface IOMonadIO : MonadIO<IOPartialOf<Nothing>>, IOMonad<Nothing> { + override fun <A> IO<Nothing, A>.liftIO(): IO<Nothing, A> = this +} - override suspend fun <A> unsafe.runNonBlocking(fa: () -> Kind<ForIO, A>, cb: (Either<Throwable, A>) -> Unit) = - fa().fix().unsafeRunAsync(cb) +private val MonadIO: MonadIO<IOPartialOf<Nothing>> = + object : IOMonadIO {} - override suspend fun <A> unsafe.runNonBlockingCancellable(onCancel: OnCancel, fa: () -> Kind<ForIO, A>, cb: (Either<Throwable, A>) -> Unit): Disposable = - fa().fix().unsafeRunAsyncCancellable(onCancel, cb) +fun IO.Companion.monadIO(): MonadIO<IOPartialOf<Nothing>> = + MonadIO + +private val UnsafeRun: IOUnsafeRun = + object : IOUnsafeRun {} + +fun IO.Companion.unsafeRun(): UnsafeRun<IOPartialOf<Nothing>> = + UnsafeRun + +fun <A> unsafe.runBlocking(fa: () -> IOOf<Nothing, A>): A = invoke { + UnsafeRun.run { runBlocking(fa) } } +fun <A> unsafe.runNonBlocking(fa: () -> Kind<IOPartialOf<Nothing>, A>, cb: (Either<Throwable, A>) -> Unit): Unit = invoke { + UnsafeRun.run { runNonBlocking(fa, cb) } +} + +interface IOUnsafeCancellableRun : UnsafeCancellableRun<IOPartialOf<Nothing>>, IOUnsafeRun { + override suspend fun <A> unsafe.runNonBlockingCancellable(onCancel: OnCancel, fa: () -> Kind<IOPartialOf<Nothing>, A>, cb: (Either<Throwable, A>) -> Unit): Disposable = + fa().unsafeRunAsyncCancellable(onCancel, cb) +} + +private val UnsafeCancellableRun: IOUnsafeCancellableRun = + object : IOUnsafeCancellableRun {} + +fun IO.Companion.unsafeCancellableRun(): UnsafeCancellableRun<IOPartialOf<Nothing>> = + UnsafeCancellableRun + +fun <A> unsafe.runNonBlockingCancellable(onCancel: OnCancel, fa: () -> Kind<IOPartialOf<Nothing>, A>, cb: (Either<Throwable, A>) -> Unit): Disposable = + invoke { + UnsafeCancellableRun.run { + runNonBlockingCancellable(onCancel, fa, cb) + } + } + @extension -interface IODispatchers : Dispatchers<ForIO> { +interface IODispatchers<E> : Dispatchers<IOPartialOf<E>> { override fun default(): CoroutineContext = IODispatchers.CommonPool diff --cc modules/fx/arrow-fx/src/main/kotlin/arrow/fx/typeclasses/MonadIO.kt index 44888ba63,cfe83f5a6..000000000 --- a/modules/fx/arrow-fx/src/main/kotlin/arrow/fx/typeclasses/MonadIO.kt +++ b/modules/fx/arrow-fx/src/main/kotlin/arrow/fx/typeclasses/MonadIO.kt @@@ -13,5 -13,5 +13,9 @@@ import arrow.typeclasses.Mona * If that is not enough use the fx-mtl package for better instances of the effect hierarchy. **/ interface MonadIO<M> : Monad<M> { ++<<<<<<< HEAD + fun <A> IO<Nothing, A>.liftIO(): Kind<M, A> ++======= + fun <A> IO<A>.liftIO(): Kind<M, A> ++>>>>>>> origin/master } diff --cc modules/fx/arrow-fx/src/test/kotlin/arrow/fx/EffectsSuspendDSLTests.kt index bda075c4c,a1741f9ee..000000000 --- a/modules/fx/arrow-fx/src/test/kotlin/arrow/fx/EffectsSuspendDSLTests.kt +++ b/modules/fx/arrow-fx/src/test/kotlin/arrow/fx/EffectsSuspendDSLTests.kt @@@ -114,7 -113,7 +114,11 @@@ class EffectsSuspendDSLTests : UnitSpec "suspend () -> A ≅ Kind<F, A> isomorphism" { fxTest { ++<<<<<<< HEAD + IO.fx<Nothing, Boolean> { ++======= + IO.fx { ++>>>>>>> origin/master val suspendedValue = !effect { suspend { 1 }() } val ioValue = !IO.just(1) suspendedValue == ioValue @@@ -244,7 -243,7 +248,11 @@@ return done } fxTest { ++<<<<<<< HEAD + IO.fx<Nothing, String> { ++======= + IO.fx { ++>>>>>>> origin/master val appliedPureEffect1: String = !effect { sideEffect() } val appliedPureEffect2: String = !effect { sideEffect() } appliedPureEffect1 diff --cc modules/fx/arrow-fx/src/test/kotlin/arrow/fx/IOTest.kt index 8481b82a2,cba28ddc8..000000000 --- a/modules/fx/arrow-fx/src/test/kotlin/arrow/fx/IOTest.kt +++ b/modules/fx/arrow-fx/src/test/kotlin/arrow/fx/IOTest.kt @@@ -479,7 -469,7 +479,11 @@@ class IOTest : UnitSpec() } "IO.binding should for comprehend over IO" { ++<<<<<<< HEAD + val result = IO.fx<Nothing, Int> { ++======= + val result = IO.fx { ++>>>>>>> origin/master val x = !IO.just(1) val y = !IO { x + 1 } y @@@ -577,7 -567,7 +581,11 @@@ "IORacePair should be stack safe" { val size = 5000 ++<<<<<<< HEAD + fun ioRacePair(i: Int): IO<Nothing, Int> = ++======= + fun ioRacePair(i: Int): IO<Int> = ++>>>>>>> origin/master IO.racePair(IODispatchers.CommonPool, IO.never, if (i < size) ioRacePair(i + 1) else just(i)) .map { it.fold({ a, _ -> a }, { _, b -> b }) } @@@ -587,7 -577,7 +595,11 @@@ "IORaceTriple should be stack safe" { val size = 5000 ++<<<<<<< HEAD + fun ioRaceTriple(i: Int): IO<Nothing, Int> = ++======= + fun ioRaceTriple(i: Int): IO<Int> = ++>>>>>>> origin/master IO.raceTriple(IODispatchers.CommonPool, IO.never, IO.never, if (i < size) ioRaceTriple(i + 1) else just(i)) .map { it.fold({ a, _, _ -> a }, { _, b, _ -> b }, { _, _, c -> c }) } diff --cc modules/fx/arrow-fx/src/test/kotlin/arrow/fx/QueueTest.kt index 09afdec23,bec0e1c30..000000000 --- a/modules/fx/arrow-fx/src/test/kotlin/arrow/fx/QueueTest.kt +++ b/modules/fx/arrow-fx/src/test/kotlin/arrow/fx/QueueTest.kt @@@ -1,352 -1,326 +1,681 @@@ ++<<<<<<< HEAD +// package arrow.fx +// +// import arrow.Kind +// import arrow.core.None +// import arrow.core.Some +// import arrow.core.Tuple2 +// import arrow.core.Tuple3 +// import arrow.core.Left +// import arrow.core.extensions.list.traverse.traverse +// import arrow.core.fix +// import arrow.fx.extensions.io.concurrent.concurrent +// import arrow.fx.extensions.io.dispatchers.dispatchers +// import arrow.fx.typeclasses.Concurrent +// import arrow.fx.rx2.MaybeK +// import arrow.fx.rx2.MaybeKOf +// import arrow.fx.rx2.extensions.concurrent +// import arrow.fx.rx2.value +// import arrow.fx.typeclasses.milliseconds +// import arrow.test.UnitSpec +// import arrow.test.generators.nonEmptyList +// import arrow.test.generators.tuple2 +// import arrow.test.generators.tuple3 +// import arrow.test.generators.unit +// import io.kotlintest.fail +// import io.kotlintest.matchers.types.shouldBeInstanceOf +// import arrow.test.laws.equalUnderTheLaw +// import arrow.test.laws.forFew +// import arrow.typeclasses.Eq +// import io.kotlintest.properties.Gen +// import io.kotlintest.properties.forAll +// import io.kotlintest.shouldBe +// import kotlin.coroutines.CoroutineContext +// import kotlin.time.ExperimentalTime +// import kotlin.time.measureTimedValue +// +// class QueueTest : UnitSpec() { +// +// init { +// +// fun <F> Concurrent<F>.tests( +// label: String, +// ctx: CoroutineContext = IO.dispatchers<Nothing>().default(), +// factory: QueueFactory<F>, +// EQ: Eq<Kind<F, Unit>> +// ) { +// +// fun Kind<F, Unit>.test(): Boolean = +// equalUnderTheLaw(unit(), EQ) +// +// "$label - offer and take a number of values in the same order" { +// forFew(6, Gen.tuple3(Gen.int(), Gen.int(), Gen.int())) { t -> +// fx.concurrent { +// val q = !factory.unbounded<Int>() +// !q.offer(t.a) +// !q.offer(t.b) +// !q.offer(t.c) +// val first = !q.take() +// val second = !q.take() +// val third = !q.take() +// !effect { Tuple3(first, second, third) shouldBe t } +// }.test() +// } +// } +// +// "$label - time out taking from an empty queue" { +// fx.concurrent { +// val wontComplete = factory.unbounded<Int>().flatMap(Queue<F, Int>::take) +// val start = !effect { System.currentTimeMillis() } +// val received = !wontComplete.map { Some(it) } +// .waitFor(100.milliseconds, default = just(None)) +// val elapsed = !effect { System.currentTimeMillis() - start } +// !effect { received shouldBe None } +// !effect { (elapsed >= 100) shouldBe true } +// }.test() +// } +// +// "$label - suspended take calls on an empty queue complete when offer calls made to queue" { +// forFew(10, Gen.int()) { i -> +// fx.concurrent { +// val q = !factory.unbounded<Int>() +// val first = !q.take().fork(ctx) +// !q.offer(i) +// val res = !first.join() +// !effect { res shouldBe i } +// }.test() +// } +// } +// +// "$label - multiple take calls on an empty queue complete when until as many offer calls made to queue" { +// forFew(6, Gen.tuple3(Gen.int(), Gen.int(), Gen.int())) { t -> +// fx.concurrent { +// val q = !factory.unbounded<Int>() +// val first = !q.take().fork(ctx) +// val second = !q.take().fork(ctx) +// val third = !q.take().fork(ctx) +// !q.offer(t.a) +// !q.offer(t.b) +// !q.offer(t.c) +// val firstValue = !first.join() +// val secondValue = !second.join() +// val thirdValue = !third.join() +// !effect { setOf(firstValue, secondValue, thirdValue) shouldBe setOf(t.a, t.b, t.c) } +// }.test() +// } +// } +// +// "$label - taking from a shutdown queue creates a QueueShutdown error" { +// forAll(Gen.int()) { i -> +// fx.concurrent { +// val res = !fx.concurrent { +// val q = !factory.unbounded<Int>() +// !q.offer(i) +// !q.shutdown() +// !q.take() +// }.attempt() +// +// !effect { res shouldBe Left(QueueShutdown) } +// }.test() +// } +// } +// +// "$label - offering to a shutdown queue creates a QueueShutdown error" { +// forAll(Gen.int()) { i -> +// fx.concurrent { +// val res = !fx.concurrent { +// val q = !factory.unbounded<Int>() +// !q.shutdown() +// !q.offer(i) +// }.attempt() +// +// !effect { res shouldBe Left(QueueShutdown) } +// }.test() +// } +// } +// +// "$label - joining a forked, incomplete take call on a shutdown queue creates a QueueShutdown error" { +// fx.concurrent { +// val res = !fx.concurrent { +// val q = !factory.unbounded<Int>() +// val t = !q.take().fork(ctx) +// !q.shutdown() +// !t.join() +// }.attempt() +// !effect { res shouldBe Left(QueueShutdown) } +// }.test() +// } +// +// "$label - create a shutdown hook completing a promise, then shutdown the queue, the promise should be completed" { +// fx.concurrent { +// val q = !factory.unbounded<Int>() +// val p = !Promise<F, Boolean>(this@tests) +// !(q.awaitShutdown().followedBy(p.complete(true))).fork() +// !q.shutdown() +// val res = !p.get() +// !effect { res shouldBe true } +// }.test() +// } +// +// "$label - shut it down, create a shutdown hook completing a promise, the promise should be completed immediately" { +// fx.concurrent { +// val q = !factory.unbounded<Int>() +// !q.shutdown() +// val p = !Promise<F, Boolean>(this@tests) +// !(q.awaitShutdown().followedBy(p.complete(true))).fork() +// !effect { p.get() shouldBe true } +// }.test() +// } +// +// "$label - drops elements offered to a queue at capacity" { +// forFew(6, Gen.int(), Gen.int(), Gen.int()) { x, x2, x3 -> +// fx.concurrent { +// val q = !factory.dropping<Int>(1) +// !q.offer(x) +// !q.offer(x2) // this `x2` should be dropped +// val taken = !q.take() +// !q.offer(x3) +// val taken2 = !q.take() +// val res = Tuple2(taken, taken2) +// !effect { res shouldBe Tuple2(x, x3) } +// }.test() +// } +// } +// +// "$label - offering to a zero capacity queue with a pending taker" { +// forFew(3, Gen.int()) { x -> +// fx.concurrent { +// val q = !factory.dropping<Int>(0) +// val taker = !q.take().fork(ctx) +// // Wait for the forked `take` to complete by checking the queue `size`, +// // otherwise the test will suspend indefinitely if `take` occurs after `offer`. +// !q.size().repeat<F, Int, Int>(this, Schedule.doUntil(this) { it == -1 }) +// !q.offer(x) +// val res = !taker.join() +// !effect { res shouldBe x } +// }.test() +// } +// } +// +// +// "$label - time out offering to a queue at capacity" { +// fx.concurrent { +// val q = !factory.bounded<Int>(1) +// !q.offer(1) +// val start = !effect { System.currentTimeMillis() } +// val wontComplete = q.offer(2) +// val received = !wontComplete.map { Some(it) } +// .waitFor(100.milliseconds, default = just(None)) +// val elapsed = !effect { System.currentTimeMillis() - start } +// !effect { received shouldBe None } +// !effect { (elapsed >= 100) shouldBe true } +// }.test() +// } +// +// "$label - offering to a 0 capacity queue in deficit honours blocking strategy" { +// fx.concurrent { +// val q = !factory.bounded<Int>(0) +// // flip from initial Surplus state to Deficit +// val first = !q.take().fork(ctx) +// // then clear previous taker while staying in Deficit +// !q.offer(1) +// !first.join() +// val start = !effect { System.currentTimeMillis() } +// val wontComplete = q.offer(2) +// val received = !wontComplete.map { Some(it) } +// .waitFor(100.milliseconds, default = just(None)) +// val elapsed = !effect { System.currentTimeMillis() - start } +// !effect { received shouldBe None } +// !effect { (elapsed >= 100) shouldBe true } +// }.test() +// } +// +// "$label - suspended offers called on an full queue complete when take calls made to queue" { +// forFew(3, Gen.tuple2(Gen.int(), Gen.int())) { t -> +// fx.concurrent { +// val q = !factory.bounded<Int>(1) +// !q.offer(t.a) +// !q.offer(t.b).fork(ctx) +// val first = !q.take() +// val second = !q.take() +// !effect { Tuple2(first, second) shouldBe t } +// }.test() +// } +// } +// +// "$label - multiple offer calls on an full queue complete when as many take calls are made to queue" { +// forAll(Gen.tuple3(Gen.int(), Gen.int(), Gen.int())) { t -> +// fx.concurrent { +// val q = !factory.bounded<Int>(1) +// !q.offer(t.a) +// !q.offer(t.b).fork() +// !q.offer(t.c).fork() +// +// val first = !q.take() +// val second = !q.take() +// val third = !q.take() +// +// val took = Tuple3(first, second, third) +// val res = setOf(first, second, third) +// +// val expected = setOf(t.a, t.b, t.c) +// !effect { println("$took shouldBe $expected") } +// !effect { res shouldBe expected } +// }.test() +// } +// } +// +// "$label - capacity must be a positive integer" { +// factory.sliding<Int>(0).attempt().flatMap { res -> +// effect { +// res.fold( +// { err -> err.shouldBeInstanceOf<IllegalArgumentException>() }, +// { fail("Expected Left<IllegalArgumentException>") } +// ) +// } +// }.test() +// } +// +// "$label - removes first element after offering to a queue at capacity" { +// forFew(3, Gen.int(), Gen.nonEmptyList(Gen.int())) { x, xs -> +// fx.concurrent { +// val q = !factory.sliding<Int>(xs.size) +// !q.offer(x) +// !xs.traverse(this, q::offer) +// val taken = !(1..xs.size).toList().traverse(this) { q.take() } +// !effect { taken shouldBe xs.toList() } +// }.test() +// } +// } +// } +// +// IO.concurrent<Nothing>().run { +// // tests( +// // label = "IO Queue test", +// // factory = Queue.factory(IO.concurrent()), +// // EQ = IO_EQ() +// // ) +// // boundedStrategyTests(queue = { capacity -> Queue.bounded(capacity, this) }, EQ = IO_EQ()) +// // slidingStrategyTests(queue = { capacity -> Queue.sliding(capacity, this) }, EQ = IO_EQ()) +// // droppingStrategyTests(queue = { capacity -> Queue.dropping(capacity, this) }, EQ = IO_EQ()) +// // unboundedStrategyTests(queue = { Queue.unbounded(this) }, EQ = IO_EQ()) +// } +// +// MaybeK.concurrent().run { +// tests( +// label = "MaybeK Queue test", +// factory = Queue.factory(MaybeK.concurrent()), +// EQ = MaybeK.eq() +// ) +// // boundedStrategyTests(queue = { capacity -> Queue.bounded(capacity, this) }, EQ = MaybeK.eq()) +// // slidingStrategyTests(queue = { capacity -> Queue.sliding(capacity, this) }, EQ = MaybeK.eq()) +// // droppingStrategyTests(queue = { capacity -> Queue.dropping(capacity, this) }, EQ = MaybeK.eq()) +// // unboundedStrategyTests(queue = { Queue.unbounded(this) }, EQ = IO_EQ()) +// } +// } +// } +// +// fun <F> Queue.Companion.factory(CF: Concurrent<F>): QueueFactory<F> = +// object : QueueFactory<F> { +// override fun CF(): Concurrent<F> = CF +// } +// +// interface QueueFactory<F> { +// fun CF(): Concurrent<F> +// +// fun <A> bounded(capacity: Int): Kind<F, Queue<F, A>> = +// Queue.bounded(capacity, CF()) +// +// fun <A> sliding(capacity: Int): Kind<F, Queue<F, A>> = +// Queue.sliding(capacity, CF()) +// +// fun <A> dropping(capacity: Int): Kind<F, Queue<F, A>> = +// Queue.dropping(capacity, CF()) +// +// fun <A> unbounded(): Kind<F, Queue<F, A>> = +// Queue.unbounded(CF()) +// } +// +// private fun <T> MaybeK.Companion.eq(): Eq<MaybeKOf<T>> = object : Eq<MaybeKOf<T>> { +// override fun MaybeKOf<T>.eqv(b: MaybeKOf<T>): Boolean { +// val res1 = arrow.core.Try { value().timeout(5, java.util.concurrent.TimeUnit.SECONDS).blockingGet() } +// val res2 = arrow.core.Try { b.value().timeout(5, java.util.concurrent.TimeUnit.SECONDS).blockingGet() } +// return res1.fold({ t1 -> +// res2.fold({ t2 -> +// (t1::class.java == t2::class.java) +// }, { false }) +// }, { v1 -> +// res2.fold({ false }, { +// v1 == it +// }) +// }) +// } +// } ++======= + package arrow.fx + + import arrow.core.None + import arrow.core.Some + import arrow.core.Tuple2 + import arrow.core.Tuple3 + import arrow.core.Left + import arrow.core.extensions.list.traverse.traverse + import arrow.core.fix + import arrow.fx.extensions.fx + import arrow.fx.extensions.io.applicative.applicative + import arrow.fx.extensions.io.concurrent.concurrent + import arrow.fx.extensions.io.dispatchers.dispatchers + import arrow.fx.extensions.io.monad.monad + import arrow.fx.typeclasses.milliseconds + import arrow.test.UnitSpec + import arrow.test.generators.nonEmptyList + import arrow.test.generators.tuple2 + import arrow.test.generators.tuple3 + import io.kotlintest.fail + import io.kotlintest.matchers.types.shouldBeInstanceOf + import io.kotlintest.properties.Gen + import io.kotlintest.properties.forAll + import io.kotlintest.shouldBe + import kotlin.coroutines.CoroutineContext + + class QueueTest : UnitSpec() { + + init { + + fun allStrategyTests( + label: String, + ctx: CoroutineContext = IO.dispatchers().default(), + queue: (Int) -> IO<Queue<ForIO, Int>> + ) { + + "$label - make a queue the add values then retrieve in the same order" { + forAll(Gen.nonEmptyList(Gen.int())) { l -> + IO.fx { + val q = !queue(l.size) + !l.traverse(IO.applicative(), q::offer) + val nl = !(1..l.size).toList().traverse(IO.applicative()) { q.take() } + nl.fix() + }.unsafeRunSync() == l.toList() + } + } + + "$label - offer and take a number of values in the same order" { + forAll(Gen.tuple3(Gen.int(), Gen.int(), Gen.int())) { t -> + IO.fx { + val q = !queue(3) + !q.offer(t.a) + !q.offer(t.b) + !q.offer(t.c) + val first = !q.take() + val second = !q.take() + val third = !q.take() + Tuple3(first, second, third) + }.unsafeRunSync() == t + } + } + + "$label - time out taking from an empty queue" { + IO.fx { + val wontComplete = queue(10).flatMap(Queue<ForIO, Int>::take) + val start = !effect { System.currentTimeMillis() } + val received = !wontComplete.map { Some(it) } + .waitFor(100.milliseconds, default = just(None)) + val elapsed = !effect { System.currentTimeMillis() - start } + !effect { received shouldBe None } + !effect { (elapsed >= 100) shouldBe true } + }.unsafeRunSync() + } + + "$label - suspended take calls on an empty queue complete when offer calls made to queue" { + forAll(Gen.int()) { i -> + IO.fx { + val q = !queue(3) + val first = !q.take().fork(ctx) + !q.offer(i) + !first.join() + }.unsafeRunSync() == i + } + } + + "$label - multiple take calls on an empty queue complete when until as many offer calls made to queue" { + forAll(Gen.tuple3(Gen.int(), Gen.int(), Gen.int())) { t -> + IO.fx { + val q = !queue(3) + val first = !q.take().fork(ctx) + val second = !q.take().fork(ctx) + val third = !q.take().fork(ctx) + !q.offer(t.a) + !q.offer(t.b) + !q.offer(t.c) + val firstValue = !first.join() + val secondValue = !second.join() + val thirdValue = !third.join() + setOf(firstValue, secondValue, thirdValue) + }.unsafeRunSync() == setOf(t.a, t.b, t.c) + } + } + + "$label - taking from a shutdown queue creates a QueueShutdown error" { + forAll(Gen.int()) { i -> + IO.fx { + val q = !queue(10) + !q.offer(i) + !q.shutdown() + !q.take() + }.attempt().unsafeRunSync() == Left(QueueShutdown) + } + } + + "$label - offering to a shutdown queue creates a QueueShutdown error" { + forAll(Gen.int()) { i -> + IO.fx { + val q = !queue(10) + !q.shutdown() + !q.offer(i) + }.attempt().unsafeRunSync() == Left(QueueShutdown) + } + } + + "$label - joining a forked, incomplete take call on a shutdown queue creates a QueueShutdown error" { + IO.fx { + val q = !queue(10) + val t = !q.take().fork(ctx) + !q.shutdown() + !t.join() + }.attempt().unsafeRunSync() shouldBe Left(QueueShutdown) + } + + "$label - create a shutdown hook completing a promise, then shutdown the queue, the promise should be completed" { + IO.fx { + val q = !queue(10) + val p = !Promise<ForIO, Boolean>(IO.concurrent()) + !(q.awaitShutdown().followedBy(p.complete(true))).fork() + !q.shutdown() + !p.get() + }.unsafeRunSync() + } + + "$label - create a shutdown hook completing a promise twice, then shutdown the queue, both promises should be completed" { + IO.fx { + val q = !queue(10) + val p1 = !Promise<ForIO, Boolean>(IO.concurrent()) + val p2 = !Promise<ForIO, Boolean>(IO.concurrent()) + !(q.awaitShutdown().followedBy(p1.complete(true))).fork() + !(q.awaitShutdown().followedBy(p2.complete(true))).fork() + !q.shutdown() + !mapN(p1.get(), p2.get()) { (p1, p2) -> p1 && p2 } + }.unsafeRunSync() + } + + "$label - shut it down, create a shutdown hook completing a promise, the promise should be completed immediately" { + IO.fx { + val q = !queue(10) + !q.shutdown() + val p = !Promise<ForIO, Boolean>(IO.concurrent()) + !(q.awaitShutdown().followedBy(p.complete(true))).fork() + !p.get() + }.unsafeRunSync() + } + } + + fun boundedStrategyTests( + ctx: CoroutineContext = IO.dispatchers().default(), + queue: (Int) -> IO<Queue<ForIO, Int>> + ) { + val label = "BoundedQueue" + allStrategyTests(label, ctx, queue) + + "$label - time out offering to a queue at capacity" { + IO.fx { + val q = !queue(1) + !q.offer(1) + val start = !effect { System.currentTimeMillis() } + val wontComplete = q.offer(2) + val received = !wontComplete.map { Some(it) } + .waitFor(100.milliseconds, default = just(None)) + val elapsed = !effect { System.currentTimeMillis() - start } + !effect { received shouldBe None } + !effect { (elapsed >= 100) shouldBe true } + }.unsafeRunSync() + } + + "$label - offering to a 0 capacity queue in deficit honours blocking strategy" { + IO.fx { + val q = !queue(0) + // flip from initial Surplus state to Deficit + val first = !q.take().fork(ctx) + // then clear previous taker while staying in Deficit + !q.offer(1) + !first.join() + val start = !effect { System.currentTimeMillis() } + val wontComplete = q.offer(2) + val received = !wontComplete.map { Some(it) } + .waitFor(100.milliseconds, default = just(None)) + val elapsed = !effect { System.currentTimeMillis() - start } + !effect { received shouldBe None } + !effect { (elapsed >= 100) shouldBe true } + }.unsafeRunSync() + } + + "$label - suspended offers called on an full queue complete when take calls made to queue" { + forAll(Gen.tuple2(Gen.int(), Gen.int())) { t -> + IO.fx { + val q = !queue(1) + !q.offer(t.a) + !q.offer(t.b).fork(ctx) + val first = !q.take() + val second = !q.take() + Tuple2(first, second) + }.unsafeRunSync() == t + } + } + + "$label - multiple offer calls on an full queue complete when as many take calls are made to queue" { + forAll(Gen.tuple3(Gen.int(), Gen.int(), Gen.int())) { t -> + IO.fx { + val q = !queue(1) + !q.offer(t.a) + !q.offer(t.b).fork(ctx) + !q.offer(t.c).fork(ctx) + val first = !q.take() + val second = !q.take() + val third = !q.take() + setOf(first, second, third) + }.unsafeRunSync() == setOf(t.a, t.b, t.c) + } + } + + "$label - joining a forked offer call made to a shut down queue creates a QueueShutdown error" { + forAll(Gen.int()) { i -> + IO.fx { + val q = !queue(1) + !q.offer(i) + val o = !q.offer(i).fork(ctx) + !q.shutdown() + !o.join() + }.attempt().unsafeRunSync() == Left(QueueShutdown) + } + } + } + + fun slidingStrategyTests( + ctx: CoroutineContext = IO.dispatchers().default(), + queue: (Int) -> IO<Queue<ForIO, Int>> + ) { + val label = "SlidingQueue" + allStrategyTests(label, ctx, queue) + + "$label - capacity must be a positive integer" { + queue(0).attempt().unsafeRunSync().fold( + { err -> err.shouldBeInstanceOf<IllegalArgumentException>() }, + { fail("Expected Left<IllegalArgumentException>") } + ) + } + + "$label - removes first element after offering to a queue at capacity" { + forAll(Gen.int(), Gen.nonEmptyList(Gen.int())) { x, xs -> + IO.fx { + val q = !queue(xs.size) + !q.offer(x) + !xs.traverse(IO.applicative(), q::offer) + val taken = !(1..xs.size).toList().traverse(IO.applicative()) { q.take() } + taken.fix() + }.unsafeRunSync() == xs.toList() + } + } + } + + fun droppingStrategyTests( + ctx: CoroutineContext = IO.dispatchers().default(), + queue: (Int) -> IO<Queue<ForIO, Int>> + ) { + val label = "DroppingQueue" + + allStrategyTests(label, ctx, queue) + + "$label - offering to a zero capacity queue with a pending taker" { + forAll(Gen.int()) { x -> + IO.fx { + val q = !queue(0) + val taker = !q.take().fork(ctx) + // Wait for the forked `take` to complete by checking the queue `size`, + // otherwise the test will suspend indefinitely if `take` occurs after `offer`. + !q.size().repeat<ForIO, Int, Int>(IO.concurrent(), Schedule.doUntil(IO.monad()) { it == -1 }) + !q.offer(x) + !taker.join() + }.unsafeRunSync() == x + } + } + + "$label - drops elements offered to a queue at capacity" { + forAll(Gen.int(), Gen.int(), Gen.nonEmptyList(Gen.int())) { x, x2, xs -> + IO.fx { + val q = !queue(xs.size) + !xs.traverse(IO.applicative(), q::offer) + !q.offer(x) // this `x` should be dropped + val taken = !(1..xs.size).toList().traverse(IO.applicative()) { q.take() } + !q.offer(x2) + val taken2 = !q.take() + taken.fix() + taken2 + }.unsafeRunSync() == xs.toList() + x2 + } + } + } + + fun unboundedStrategyTests( + ctx: CoroutineContext = IO.dispatchers().default(), + queue: (Int) -> IO<Queue<ForIO, Int>> + ) { + allStrategyTests("UnboundedQueue", ctx, queue) + } + + boundedStrategyTests { capacity -> Queue.bounded<ForIO, Int>(capacity, IO.concurrent()).fix() } + + slidingStrategyTests { capacity -> Queue.sliding<ForIO, Int>(capacity, IO.concurrent()).fix() } + + droppingStrategyTests { capacity -> Queue.dropping<ForIO, Int>(capacity, IO.concurrent()).fix() } + + unboundedStrategyTests { Queue.unbounded<ForIO, Int>(IO.concurrent()).fix() } + } + } ++>>>>>>> origin/master diff --cc modules/fx/arrow-fx/src/test/kotlin/arrow/fx/ResourceTest.kt index bba54799d,ac94b98dc..000000000 --- a/modules/fx/arrow-fx/src/test/kotlin/arrow/fx/ResourceTest.kt +++ b/modules/fx/arrow-fx/src/test/kotlin/arrow/fx/ResourceTest.kt @@@ -25,11 -24,11 +25,16 @@@ import io.kotlintest.properties.Ge class ResourceTest : UnitSpec() { init { - val EQ = Eq<Kind<ResourcePartialOf<ForIO, Throwable>, Int>> { a, b -> - val tested: IO<Int> = a.fix().invoke { IO.just(1) }.fix() + val EQ = Eq<Kind<ResourcePartialOf<IOPartialOf<Nothing>, Throwable>, Int>> { a, b -> + val tested: IO<Nothing, Int> = a.fix().invoke { IO.just(1) }.fix() val expected = b.fix().invoke { IO.just(1) }.fix() ++<<<<<<< HEAD + val compare = IO.applicative<Nothing>().mapN(tested, expected) { (t, e) -> t == e }.fix() + compare.unsafeRunTimed(5.seconds) == Some(Right(true)) ++======= + val compare = IO.applicative().mapN(tested, expected) { (t, e) -> t == e }.fix() + compare.unsafeRunTimed(5.seconds) == Some(true) ++>>>>>>> origin/master } testLaws( @@@ -62,9 -61,9 +67,13 @@@ private fun Resource.Companion.eqK() = (this.fix() to other.fix()).let { val ls = it.first.invoke { IO.just(1) }.fix() val rs = it.second.invoke { IO.just(1) }.fix() ++<<<<<<< HEAD + val compare = IO.applicative<Nothing>().mapN(ls, rs) { (l, r) -> l == r }.fix() ++======= + val compare = IO.applicative().mapN(ls, rs) { (l, r) -> l == r }.fix() ++>>>>>>> origin/master - compare.unsafeRunTimed(5.seconds) == Some(true) + compare.unsafeRunTimed(5.seconds) == Some(Right(true)) } }
Solving this problem! I made a mistake with the sync way :woman_facepalming:
Solved via #1987
Error log
https://github.com/arrow-kt/arrow/commit/fec5c82e97f371fbc102977dba8ce9257d5df4e7/checks
Conflicts