Closed github-actions[bot] closed 4 years ago
https://github.com/arrow-kt/arrow-fx/commit/07677ccd5a2cb3127af9cb5d9e29e105e19a7529/checks
diff --cc arrow-benchmarks-fx/src/jmh/kotlin/arrow/benchmarks/Queue.kt index 4ab38a8,c7c74a6..0000000 --- a/arrow-benchmarks-fx/src/jmh/kotlin/arrow/benchmarks/Queue.kt +++ b/arrow-benchmarks-fx/src/jmh/kotlin/arrow/benchmarks/Queue.kt @@@ -1,16 -1,13 +1,23 @@@ package arrow.benchmarks ++<<<<<<< HEAD +import arrow.fx.ConcurrentQueue ++======= + import arrow.fx.ForIO ++>>>>>>> origin/master import arrow.fx.IO import arrow.fx.IOOf +import arrow.fx.IOPartialOf import arrow.fx.Queue import arrow.fx.extensions.io.concurrent.concurrent -import arrow.fx.extensions.io.functor.unit import arrow.fx.extensions.io.monad.flatMap import arrow.fx.fix ++<<<<<<< HEAD +import arrow.fx.internal.CancellableQueue +import arrow.fx.unsafeRunSync +import arrow.fx.void ++======= ++>>>>>>> origin/master import org.openjdk.jmh.annotations.Benchmark import org.openjdk.jmh.annotations.CompilerControl import org.openjdk.jmh.annotations.Fork @@@ -34,21 -31,19 +41,29 @@@ open class Queue @Param("1000") var size: Int = 0 ++<<<<<<< HEAD + var ConcurQueue by Delegates.notNull<Queue<IOPartialOf<Nothing>, Int>>() + var CancelQueue by Delegates.notNull<Queue<IOPartialOf<Nothing>, Int>>() + + @Setup(Level.Trial) + fun createQueues() { + ConcurQueue = ConcurrentQueue.empty<IOPartialOf<Nothing>, Int>(IO.concurrent()).unsafeRunSync() + CancelQueue = CancellableQueue.empty<IOPartialOf<Nothing>, Int>(IO.concurrent()).unsafeRunSync() ++======= + var ConcurQueue by Delegates.notNull<Queue<ForIO, Int>>() + + @Setup(Level.Trial) + fun createQueues(): Unit { + ConcurQueue = Queue.unbounded<ForIO, Int>(IO.concurrent()).fix().unsafeRunSync() ++>>>>>>> origin/master } - fun <A> IOOf<A>.repeat(n: Int): IO<A> = + fun <A> IOOf<Nothing, A>.repeat(n: Int): IO<Nothing, A> = if (n < 1) fix() else flatMap { repeat(n - 1) } - fun loop(q: Queue<ForIO, Int>): Unit = - q.offer(0).unit().repeat(size).flatMap { - q.take().unit().repeat(size) + fun loop(q: Queue<IOPartialOf<Nothing>, Int>): Unit = + q.offer(0).void().repeat(size).flatMap { + q.take().void().repeat(size) }.unsafeRunSync() @Benchmark diff --cc arrow-fx-kotlinx-coroutines/src/test/kotlin/arrow/integrations/kotlinx/CoroutinesIntegrationTest.kt index 2b2f72e,92e84a4..0000000 --- a/arrow-fx-kotlinx-coroutines/src/test/kotlin/arrow/integrations/kotlinx/CoroutinesIntegrationTest.kt +++ b/arrow-fx-kotlinx-coroutines/src/test/kotlin/arrow/integrations/kotlinx/CoroutinesIntegrationTest.kt @@@ -1,35 -1,29 +1,56 @@@ package arrow.integrations.kotlinx +import arrow.core.Right +import arrow.core.Some import arrow.Kind -import arrow.core.Either -import arrow.core.extensions.either.eq.eq import arrow.core.extensions.eq import arrow.core.internal.AtomicRefW ++<<<<<<< HEAD ++======= + import arrow.core.right + import arrow.core.test.UnitSpec + import arrow.core.test.generators.throwable + import arrow.core.test.laws.equalUnderTheLaw + import arrow.fx.ForIO ++>>>>>>> origin/master import arrow.fx.IO -import arrow.fx.extensions.exitcase.eq.eq +import arrow.fx.IOPartialOf +import arrow.fx.IOResult +import arrow.fx.bracketCase +import arrow.fx.extensions.exitcase2.eq.eq import arrow.fx.extensions.fx ++<<<<<<< HEAD +import arrow.fx.extensions.io.async.effectMap +import arrow.fx.extensions.io.applicative.applicative +import arrow.fx.extensions.io.monad.followedBy +import arrow.fx.extensions.io.concurrent.waitFor +import arrow.fx.fix +import arrow.fx.handleErrorWith +import arrow.fx.flatMap +import arrow.fx.onCancel ++======= + import arrow.fx.extensions.io.applicative.applicative + import arrow.fx.extensions.io.applicativeError.attempt + import arrow.fx.extensions.io.bracket.onCancel + import arrow.fx.extensions.io.concurrent.waitFor + import arrow.fx.fix + import arrow.fx.handleErrorWith ++>>>>>>> origin/master import arrow.fx.typeclasses.Duration -import arrow.fx.typeclasses.ExitCase +import arrow.fx.typeclasses.ExitCase2 import arrow.fx.typeclasses.milliseconds import arrow.fx.typeclasses.seconds ++<<<<<<< HEAD +import arrow.fx.unsafeRunAsync +import arrow.fx.unsafeRunSync +import arrow.test.UnitSpec +import arrow.test.eq +import arrow.test.generators.throwable +import arrow.test.laws.equalUnderTheLaw +import arrow.test.laws.shouldBeEq ++======= + import arrow.fx.test.laws.shouldBeEq ++>>>>>>> origin/master import arrow.typeclasses.Eq import arrow.typeclasses.EqK import io.kotlintest.fail diff --cc arrow-fx-reactor/src/test/kotlin/arrow/fx/FluxKTest.kt index 9d2c29c,918ef45..0000000 --- a/arrow-fx-reactor/src/test/kotlin/arrow/fx/FluxKTest.kt +++ b/arrow-fx-reactor/src/test/kotlin/arrow/fx/FluxKTest.kt @@@ -11,9 -18,8 +14,14 @@@ import arrow.fx.reactor.fi import arrow.fx.reactor.k import arrow.fx.reactor.value import arrow.fx.typeclasses.ExitCase ++<<<<<<< HEAD +import arrow.test.UnitSpec +import arrow.test.generators.GenK +import arrow.test.generators.throwable ++======= + import arrow.fx.test.laws.AsyncLaws + import arrow.fx.test.laws.TimerLaws ++>>>>>>> origin/master import arrow.typeclasses.Eq import arrow.typeclasses.EqK import io.kotlintest.matchers.startWith @@@ -145,6 -152,19 +153,22 @@@ class FluxKTest : UnitSpec() ec shouldBe ExitCase.Cancelled } ++<<<<<<< HEAD ++======= + "FluxK should cancel KindConnection on dispose" { + Promise.uncancellable<ForFluxK, Unit>(FluxK.async()).flatMap { latch -> + FluxK { + FluxK.async<Unit> { conn, _ -> + conn.push(latch.complete(Unit)) + }.flux.subscribe().dispose() + }.flatMap { latch.get() } + }.value() + .test() + .expectNext(Unit) + .expectComplete() + } + ++>>>>>>> origin/master "FluxK async should be cancellable" { Promise.uncancellable<ForFluxK, Unit>(FluxK.async()) .flatMap { latch -> @@@ -160,6 -180,14 +184,17 @@@ .expectNext(Unit) .expectComplete() } ++<<<<<<< HEAD ++======= + + "KindConnection can cancel upstream" { + FluxK.async<Unit> { connection, _ -> + connection.cancel().value().subscribe() + }.value() + .test() + .expectError(ConnectionCancellationException::class) + } ++>>>>>>> origin/master } } diff --cc arrow-fx-reactor/src/test/kotlin/arrow/fx/MonoKTest.kt index a2a3d0b,3973397..0000000 --- a/arrow-fx-reactor/src/test/kotlin/arrow/fx/MonoKTest.kt +++ b/arrow-fx-reactor/src/test/kotlin/arrow/fx/MonoKTest.kt @@@ -13,8 -17,10 +13,15 @@@ import arrow.fx.reactor. import arrow.fx.reactor.unsafeRunSync import arrow.fx.reactor.value import arrow.fx.typeclasses.ExitCase ++<<<<<<< HEAD +import arrow.test.UnitSpec +import arrow.test.generators.GenK ++======= + import arrow.core.test.UnitSpec + import arrow.core.test.generators.GenK + import arrow.fx.test.laws.AsyncLaws + import arrow.fx.test.laws.TimerLaws ++>>>>>>> origin/master import arrow.typeclasses.Eq import arrow.typeclasses.EqK import io.kotlintest.matchers.startWith diff --cc arrow-fx-rx2/src/test/kotlin/arrow/fx/FlowableKTests.kt index d0b8881,45ed133..0000000 --- a/arrow-fx-rx2/src/test/kotlin/arrow/fx/FlowableKTests.kt +++ b/arrow-fx-rx2/src/test/kotlin/arrow/fx/FlowableKTests.kt @@@ -13,7 -26,8 +16,12 @@@ import arrow.fx.rx2.fi import arrow.fx.rx2.k import arrow.fx.rx2.value import arrow.fx.typeclasses.ExitCase ++<<<<<<< HEAD +import arrow.test.generators.GenK ++======= + import arrow.fx.test.laws.AsyncLaws + import arrow.fx.test.laws.ConcurrentLaws ++>>>>>>> origin/master import arrow.typeclasses.Eq import arrow.typeclasses.EqK import io.kotlintest.properties.Gen @@@ -137,14 -151,14 +145,14 @@@ class FlowableKTests : RxJavaSpec() ec shouldBe ExitCase.Cancelled } - "FlowableK should cancel KindConnection on dispose" { + "FlowableK.cancellable should cancel CancelToken on dispose" { Promise.uncancellable<ForFlowableK, Unit>(FlowableK.async()).flatMap { latch -> - FlowableK { - FlowableK.cancellable<Unit>(fa = { - latch.complete(Unit) - }).flowable.subscribe().dispose() - }.flatMap { latch.get() } - }.value() + FlowableK { + FlowableK.cancellable<Unit>(fa = { + latch.complete(Unit) + }).flowable.subscribe().dispose() + }.flatMap { latch.get() } + }.value() .test() .assertValue(Unit) .awaitTerminalEvent(100, TimeUnit.MILLISECONDS) diff --cc arrow-fx-rx2/src/test/kotlin/arrow/fx/MaybeKTests.kt index 82c1534,df8ffc2..0000000 --- a/arrow-fx-rx2/src/test/kotlin/arrow/fx/MaybeKTests.kt +++ b/arrow-fx-rx2/src/test/kotlin/arrow/fx/MaybeKTests.kt @@@ -14,8 -18,9 +14,14 @@@ import arrow.fx.rx2. import arrow.fx.rx2.unsafeRunSync import arrow.fx.rx2.value import arrow.fx.typeclasses.ExitCase ++<<<<<<< HEAD +import arrow.test.generators.GenK +import arrow.test.generators.throwable ++======= + import arrow.core.test.generators.GenK + import arrow.core.test.generators.throwable + import arrow.fx.test.laws.ConcurrentLaws ++>>>>>>> origin/master import arrow.typeclasses.Eq import arrow.typeclasses.EqK import io.kotlintest.properties.Gen diff --cc arrow-fx-rx2/src/test/kotlin/arrow/fx/ObservableKTests.kt index 0daf526,8473e83..0000000 --- a/arrow-fx-rx2/src/test/kotlin/arrow/fx/ObservableKTests.kt +++ b/arrow-fx-rx2/src/test/kotlin/arrow/fx/ObservableKTests.kt @@@ -13,8 -17,9 +13,14 @@@ import arrow.fx.rx2.fi import arrow.fx.rx2.k import arrow.fx.rx2.value import arrow.fx.typeclasses.ExitCase ++<<<<<<< HEAD +import arrow.test.generators.GenK +import arrow.test.generators.throwable ++======= + import arrow.core.test.generators.GenK + import arrow.core.test.generators.throwable + import arrow.fx.test.laws.ConcurrentLaws ++>>>>>>> origin/master import arrow.typeclasses.Eq import arrow.typeclasses.EqK import io.kotlintest.properties.Gen diff --cc arrow-fx-rx2/src/test/kotlin/arrow/fx/SingleKTests.kt index c48d983,907735c..0000000 --- a/arrow-fx-rx2/src/test/kotlin/arrow/fx/SingleKTests.kt +++ b/arrow-fx-rx2/src/test/kotlin/arrow/fx/SingleKTests.kt @@@ -15,9 -19,10 +15,16 @@@ import arrow.fx.rx2. import arrow.fx.rx2.unsafeRunSync import arrow.fx.rx2.value import arrow.fx.typeclasses.ExitCase ++<<<<<<< HEAD +import arrow.test.generators.GenK +import arrow.test.generators.throwable +import arrow.test.laws.forFew ++======= + import arrow.core.test.generators.GenK + import arrow.core.test.generators.throwable + import arrow.fx.test.laws.ConcurrentLaws + import arrow.fx.test.laws.forFew ++>>>>>>> origin/master import arrow.typeclasses.Eq import arrow.typeclasses.EqK import io.kotlintest.properties.Gen diff --cc arrow-fx/src/main/kotlin/arrow/fx/extensions/io.kt index 2ef33c1,466ce57..0000000 --- a/arrow-fx/src/main/kotlin/arrow/fx/extensions/io.kt +++ b/arrow-fx/src/main/kotlin/arrow/fx/extensions/io.kt @@@ -328,74 -231,65 +328,90 @@@ fun <EE> IO.Companion.timer(CF: Concurr Timer(CF) @extension -interface IOEffect : Effect<ForIO>, IOAsync { - override fun <A> IOOf<A>.runAsync(cb: (Either<Throwable, A>) -> IOOf<Unit>): IO<Unit> = - fix().runAsync(cb) -} +interface IOSemigroup<E, A> : Semigroup<IO<E, A>> { -// FIXME default @extension are temporarily declared in arrow-effects-io-extensions due to multiplatform needs -interface IOConcurrentEffect : ConcurrentEffect<ForIO>, IOEffect, IOConcurrent { + fun AI(): Semigroup<A> - override fun <A> IOOf<A>.runAsyncCancellable(cb: (Either<Throwable, A>) -> IOOf<Unit>): IO<Disposable> = - fix().runAsyncCancellable(OnCancel.ThrowCancellationException, cb) + override fun IO<E, A>.combine(b: IO<E, A>): IO<E, A> = + FlatMap { a1: A -> b.map { a2: A -> AI().run { a1.combine(a2) } } } } -fun IO.Companion.concurrentEffect(dispatchers: Dispatchers<ForIO>): ConcurrentEffect<ForIO> = object : IOConcurrentEffect { - override fun dispatchers(): Dispatchers<ForIO> = dispatchers +@extension ++<<<<<<< HEAD +interface IOMonoid<E, A> : Monoid<IO<E, A>>, IOSemigroup<E, A> { + override fun AI(): Monoid<A> + + override fun empty(): IO<E, A> = IO.just(AI().empty()) } -@extension +interface IOUnsafeRun : UnsafeRun<IOPartialOf<Nothing>> { + + override suspend fun <A> unsafe.runBlocking(fa: () -> Kind<IOPartialOf<Nothing>, A>): A = + fa().unsafeRunSync() + + override suspend fun <A> unsafe.runNonBlocking(fa: () -> Kind<IOPartialOf<Nothing>, A>, cb: (Either<Throwable, A>) -> Unit): Unit = + fa().unsafeRunAsync(cb) ++======= + interface IOSemigroup<A> : Semigroup<IO<A>> { + + fun SG(): Semigroup<A> + + override fun IO<A>.combine(b: IO<A>): IO<A> = + flatMap { a1: A -> b.map { a2: A -> SG().run { a1.combine(a2) } } } + } + + @extension + interface IOMonoid<A> : Monoid<IO<A>>, IOSemigroup<A> { + override fun SG(): Monoid<A> + + override fun empty(): IO<A> = IO.just(SG().empty()) ++>>>>>>> origin/master } -@extension -interface IOMonadIO : MonadIO<ForIO>, IOMonad { - override fun <A> IO<A>.liftIO(): Kind<ForIO, A> = this +interface IOMonadIO : MonadIO<IOPartialOf<Nothing>>, IOMonad<Nothing> { + override fun <A> IO<Nothing, A>.liftIO(): IO<Nothing, A> = this } -@extension -interface IOUnsafeRun : UnsafeRun<ForIO> { +private val MonadIO: MonadIO<IOPartialOf<Nothing>> = + object : IOMonadIO {} - override suspend fun <A> unsafe.runBlocking(fa: () -> Kind<ForIO, A>): A = fa().fix().unsafeRunSync() +fun IO.Companion.monadIO(): MonadIO<IOPartialOf<Nothing>> = + MonadIO - override suspend fun <A> unsafe.runNonBlocking(fa: () -> Kind<ForIO, A>, cb: (Either<Throwable, A>) -> Unit): Unit = - fa().fix().unsafeRunAsync(cb) -} +private val UnsafeRun: IOUnsafeRun = + object : IOUnsafeRun {} -@extension -interface IOUnsafeCancellableRun : UnsafeCancellableRun<ForIO> { - override suspend fun <A> unsafe.runBlocking(fa: () -> Kind<ForIO, A>): A = fa().fix().unsafeRunSync() +fun IO.Companion.unsafeRun(): UnsafeRun<IOPartialOf<Nothing>> = + UnsafeRun + +fun <A> unsafe.runBlocking(fa: () -> IOOf<Nothing, A>): A = invoke { + UnsafeRun.run { runBlocking(fa) } +} - override suspend fun <A> unsafe.runNonBlocking(fa: () -> Kind<ForIO, A>, cb: (Either<Throwable, A>) -> Unit) = - fa().fix().unsafeRunAsync(cb) +fun <A> unsafe.runNonBlocking(fa: () -> Kind<IOPartialOf<Nothing>, A>, cb: (Either<Throwable, A>) -> Unit): Unit = invoke { + UnsafeRun.run { runNonBlocking(fa, cb) } +} - override suspend fun <A> unsafe.runNonBlockingCancellable(onCancel: OnCancel, fa: () -> Kind<ForIO, A>, cb: (Either<Throwable, A>) -> Unit): Disposable = - fa().fix().unsafeRunAsyncCancellable(onCancel, cb) +interface IOUnsafeCancellableRun : UnsafeCancellableRun<IOPartialOf<Nothing>>, IOUnsafeRun { + override suspend fun <A> unsafe.runNonBlockingCancellable(onCancel: OnCancel, fa: () -> Kind<IOPartialOf<Nothing>, A>, cb: (Either<Throwable, A>) -> Unit): Disposable = + fa().unsafeRunAsyncCancellable(onCancel, cb) } +private val UnsafeCancellableRun: IOUnsafeCancellableRun = + object : IOUnsafeCancellableRun {} + +fun IO.Companion.unsafeCancellableRun(): UnsafeCancellableRun<IOPartialOf<Nothing>> = + UnsafeCancellableRun + +fun <A> unsafe.runNonBlockingCancellable(onCancel: OnCancel, fa: () -> Kind<IOPartialOf<Nothing>, A>, cb: (Either<Throwable, A>) -> Unit): Disposable = + invoke { + UnsafeCancellableRun.run { + runNonBlockingCancellable(onCancel, fa, cb) + } + } + @extension -interface IODispatchers : Dispatchers<ForIO> { +interface IODispatchers<E> : Dispatchers<IOPartialOf<E>> { override fun default(): CoroutineContext = IODispatchers.CommonPool diff --cc arrow-fx/src/test/kotlin/arrow/fx/EffectsSuspendDSLTests.kt index 676ce1c,d2d72c0..0000000 --- a/arrow-fx/src/test/kotlin/arrow/fx/EffectsSuspendDSLTests.kt +++ b/arrow-fx/src/test/kotlin/arrow/fx/EffectsSuspendDSLTests.kt @@@ -5,17 -4,15 +5,20 @@@ import arrow.core.Eithe import arrow.core.Left import arrow.core.Right import arrow.core.Tuple2 - import arrow.core.internal.AtomicIntW import arrow.core.identity ++<<<<<<< HEAD +import arrow.fx.IO.Companion.effect ++======= + import arrow.core.internal.AtomicIntW + import arrow.core.test.UnitSpec ++>>>>>>> origin/master import arrow.fx.extensions.fx import arrow.fx.extensions.io.concurrent.concurrent -import arrow.fx.extensions.io.unsafeRun.runBlocking -import arrow.fx.extensions.io.unsafeRun.unsafeRun +import arrow.fx.extensions.io.dispatchers.dispatchers +import arrow.fx.extensions.runBlocking +import arrow.fx.extensions.unsafeRun import arrow.fx.typeclasses.Concurrent import arrow.fx.typeclasses.UnsafeRun - import arrow.test.UnitSpec import arrow.unsafe import io.kotlintest.shouldBe import io.kotlintest.shouldThrow diff --cc arrow-fx/src/test/kotlin/arrow/fx/FiberTest.kt index 0d5702c,3ff04ce..0000000 --- a/arrow-fx/src/test/kotlin/arrow/fx/FiberTest.kt +++ b/arrow-fx/src/test/kotlin/arrow/fx/FiberTest.kt @@@ -23,30 -18,11 +18,36 @@@ import io.kotlintest.properties.Ge class FiberTest : UnitSpec() { init { ++<<<<<<< HEAD + fun EQ(): Eq<FiberOf<IOPartialOf<Nothing>, Int>> = object : Eq<FiberOf<IOPartialOf<Nothing>, Int>> { + override fun FiberOf<IOPartialOf<Nothing>, Int>.eqv(b: FiberOf<IOPartialOf<Nothing>, Int>): Boolean = EQ<Nothing, Int>().run { + fix().join().eqv(b.fix().join()) + } + } + + fun EQK() = object : EqK<FiberPartialOf<IOPartialOf<Nothing>>> { + override fun <A> Kind<FiberPartialOf<IOPartialOf<Nothing>>, A>.eqK(other: Kind<FiberPartialOf<IOPartialOf<Nothing>>, A>, EQ: Eq<A>): Boolean = + EQ<Nothing, A>(EQ).run { + this@eqK.fix().join().eqv(other.fix().join()) + } + } + + fun <F> GENK(A: Applicative<F>) = object : GenK<FiberPartialOf<F>> { + override fun <A> genK(gen: Gen<A>): Gen<Kind<FiberPartialOf<F>, A>> = gen.map { + Fiber(A.just(it), A.just(Unit)) + } + } + + testLaws( + ApplicativeLaws.laws<FiberPartialOf<IOPartialOf<Nothing>>>(Fiber.applicative(IO.concurrent()), Fiber.functor(IO.concurrent()), GENK(IO.applicative()), EQK()), + MonoidLaws.laws(Fiber.monoid(IO.concurrent<Nothing>(), Int.monoid()), Gen.int().map { i -> ++======= + testLaws( + ApplicativeLaws.laws(Fiber.applicative(IO.concurrent()), Fiber.functor(IO.concurrent()), Fiber.genK(IO.applicative()), Fiber.eqK()), + MonoidLaws.laws(Fiber.monoid(IO.concurrent(), Int.monoid()), Gen.int().map { i -> ++>>>>>>> origin/master Fiber(IO.just(i), IO.unit) - }, EQ()) + }, Fiber.eq(IO.eq())) ) } } diff --cc arrow-fx/src/test/kotlin/arrow/fx/IOConnectionTests.kt index a28172c,420893f..0000000 --- a/arrow-fx/src/test/kotlin/arrow/fx/IOConnectionTests.kt +++ b/arrow-fx/src/test/kotlin/arrow/fx/IOConnectionTests.kt @@@ -1,15 -1,13 +1,18 @@@ package arrow.fx - import arrow.test.UnitSpec - import arrow.test.laws.shouldBeEq - import arrow.typeclasses.Eq + import arrow.core.test.UnitSpec + import arrow.fx.test.eq.eq + import arrow.fx.test.laws.shouldBeEq import io.kotlintest.shouldBe -class KindConnectionTests : UnitSpec() { +class IOConnectionTests : UnitSpec() { init { ++<<<<<<< HEAD:arrow-fx/src/test/kotlin/arrow/fx/IOConnectionTests.kt + val EQ = IO.eqK<Nothing>().liftEq(Eq.any()) + ++======= ++>>>>>>> origin/master:arrow-fx/src/test/kotlin/arrow/fx/KindConnectionTests.kt "cancellation is only executed once" { var effect = 0 val initial = IO { effect += 1 } diff --cc arrow-fx/src/test/kotlin/arrow/fx/IOTest.kt index 4bc5f19,d9e45ea..0000000 --- a/arrow-fx/src/test/kotlin/arrow/fx/IOTest.kt +++ b/arrow-fx/src/test/kotlin/arrow/fx/IOTest.kt @@@ -9,14 -8,16 +8,21 @@@ import arrow.core.Som import arrow.core.Tuple2 import arrow.core.Tuple3 import arrow.core.Tuple4 +import arrow.core.identity import arrow.core.right ++<<<<<<< HEAD ++======= + import arrow.core.some + import arrow.core.test.UnitSpec + import arrow.core.test.concurrency.SideEffect + import arrow.core.test.laws.SemigroupKLaws ++>>>>>>> origin/master import arrow.fx.IO.Companion.just import arrow.fx.extensions.fx -import arrow.fx.extensions.io.applicative.applicative import arrow.fx.extensions.io.async.async import arrow.fx.extensions.io.concurrent.concurrent +import arrow.fx.extensions.io.concurrent.raceN +import arrow.fx.extensions.io.applicative.applicative import arrow.fx.extensions.io.dispatchers.dispatchers import arrow.fx.extensions.io.functor.functor import arrow.fx.extensions.io.monad.flatMap @@@ -25,20 -26,14 +31,15 @@@ import arrow.fx.extensions.io.monad.mon import arrow.fx.extensions.io.semigroupK.semigroupK import arrow.fx.extensions.timer import arrow.fx.extensions.toIO +import arrow.fx.extensions.toIOException import arrow.fx.internal.parMap2 import arrow.fx.internal.parMap3 -import arrow.fx.typeclasses.ExitCase +import arrow.fx.typeclasses.ExitCase2 import arrow.fx.typeclasses.milliseconds import arrow.fx.typeclasses.seconds - import arrow.test.UnitSpec - import arrow.test.concurrency.SideEffect - import arrow.test.generators.GenK - import arrow.test.generators.throwable - import arrow.test.laws.ConcurrentLaws - import arrow.test.laws.SemigroupKLaws - import arrow.typeclasses.Eq - import arrow.typeclasses.EqK + import arrow.fx.test.eq.eqK + import arrow.fx.test.generators.genK + import arrow.fx.test.laws.ConcurrentLaws import io.kotlintest.fail import io.kotlintest.properties.Gen import io.kotlintest.properties.forAll @@@ -461,7 -436,7 +462,11 @@@ class IOTest : UnitSpec() IO.parTupledN(all, IO { Thread.currentThread().name }, IO.defer { just(Thread.currentThread().name) }, ++<<<<<<< HEAD + IO.async<Nothing, String> { cb -> cb(IOResult.Success(Thread.currentThread().name)) }, ++======= + IO.async<String> { cb -> cb(Thread.currentThread().name.right()) }, ++>>>>>>> origin/master IO(other) { Thread.currentThread().name }) .unsafeRunSync() @@@ -546,33 -519,33 +551,53 @@@ } "Cancellable should run CancelToken" { ++<<<<<<< HEAD + IO.fx<Unit> { + val p = !Promise<Unit>() + IO.cancellable<Nothing, Unit> { + p.complete(Unit) + }.unsafeRunAsyncCancellable { } ++======= + Promise.uncancellable<ForIO, Unit>(IO.async()).flatMap { p -> + IO.concurrent().cancellable<Unit> { + p.complete(Unit) + }.fix() + .unsafeRunAsyncCancellable { } ++>>>>>>> origin/master .invoke() - p.get() + !p.get() }.unsafeRunSync() shouldBe Unit } "CancellableF should run CancelToken" { ++<<<<<<< HEAD + IO.fx<Unit> { + val p = !Promise<Unit>() + IO.cancellableF<Nothing, Unit> { + IO { p.complete(Unit) } + }.unsafeRunAsyncCancellable { } ++======= + Promise.uncancellable<ForIO, Unit>(IO.async()).flatMap { p -> + IO.concurrent().cancellableF<Unit> { + IO { p.complete(Unit) } + }.fix() + .unsafeRunAsyncCancellable { } ++>>>>>>> origin/master .invoke() - p.get() + !p.get() }.unsafeRunSync() shouldBe Unit } "IO should cancel cancellable on dispose" { - Promise.uncancellable<ForIO, Unit>(IO.async()).flatMap { latch -> + Promise.uncancellable<IOPartialOf<Nothing>, Unit>(IO.async()).flatMap { latch -> IO { ++<<<<<<< HEAD + IO.cancellable<Nothing, Unit> { ++======= + IO.cancellable<Unit> { ++>>>>>>> origin/master latch.complete(Unit) }.unsafeRunAsyncCancellable { } .invoke() @@@ -730,17 -703,3 +755,20 @@@ internal class TestContext : AbstractCo override fun toString(): String = "TestContext(${Integer.toHexString(hashCode())})" } ++<<<<<<< HEAD + +internal fun <E> IO.Companion.eqK() = object : EqK<IOPartialOf<E>> { + override fun <A> Kind<IOPartialOf<E>, A>.eqK(other: Kind<IOPartialOf<E>, A>, EQ: Eq<A>): Boolean = EQ<E, A>(EQ).run { + fix().eqv(other.fix()) + } +} + +internal fun IO.Companion.genK() = object : GenK<IOPartialOf<Nothing>> { + override fun <A> genK(gen: Gen<A>): Gen<Kind<IOPartialOf<Nothing>, A>> = + Gen.oneOf( + gen.map(IO.Companion::just), + Gen.throwable().map { raiseException<A>(it) } + ) +} ++======= ++>>>>>>> origin/master diff --cc arrow-fx/src/test/kotlin/arrow/fx/MVarTest.kt index 1c849d5,117f2ea..0000000 --- a/arrow-fx/src/test/kotlin/arrow/fx/MVarTest.kt +++ b/arrow-fx/src/test/kotlin/arrow/fx/MVarTest.kt @@@ -8,17 -8,18 +8,18 @@@ import arrow.core.Tuple import arrow.core.Tuple4 import arrow.core.Tuple7 import arrow.core.extensions.eq + import arrow.core.test.UnitSpec + import arrow.core.test.laws.equalUnderTheLaw import arrow.core.toT +import arrow.fx.IO.Companion.effect import arrow.fx.extensions.fx +import arrow.fx.extensions.io.apply.map2 import arrow.fx.extensions.io.async.async import arrow.fx.extensions.io.concurrent.concurrent import arrow.fx.extensions.io.concurrent.parSequence -import arrow.fx.extensions.io.monad.flatMap -import arrow.fx.extensions.io.monad.followedBy import arrow.fx.typeclasses.milliseconds - import arrow.test.UnitSpec - import arrow.test.laws.equalUnderTheLaw - import arrow.test.laws.shouldBeEq + import arrow.fx.test.eq.eq + import arrow.fx.test.laws.shouldBeEq import io.kotlintest.properties.Gen import io.kotlintest.properties.forAll import io.kotlintest.shouldBe @@@ -41,8 -39,8 +42,13 @@@ class MVarTest : UnitSpec() val r1 = av.take().bind() av.put(b).bind() val r2 = av.take().bind() ++<<<<<<< HEAD + !IO.effect { Tuple4(isEmpty, isNotEmpty, r1, r2) shouldBe Tuple4(true, true, a, b) } + }.test() ++======= + Tuple4(isEmpty, isNotEmpty, r1, r2) + }.equalUnderTheLaw(IO.just(Tuple4(true, true, a, b)), IO.eq()) ++>>>>>>> origin/master } } @@@ -58,11 -56,8 +64,16 @@@ val r2 = av.tryTake().bind() av.put(c).bind() val r3 = av.take().bind() ++<<<<<<< HEAD + !IO.effect { + Tuple7(isEmpty, p1, p2, isNotEmpty, r1, r2, r3) shouldBe + Tuple7(true, true, false, true, Some(a), None, c) + } + }.test() ++======= + Tuple7(isEmpty, p1, p2, isNotEmpty, r1, r2, r3) + }.equalUnderTheLaw(IO.just(Tuple7(true, true, false, true, Some(a), None, c)), IO.eq()) ++>>>>>>> origin/master } } @@@ -79,8 -74,8 +90,13 @@@ val aa = f1.join().bind() val bb = f2.join().bind() ++<<<<<<< HEAD + !IO.effect { setOf(aa, bb) shouldBe setOf(10, 20) } + }.test() ++======= + setOf(aa, bb) + }.shouldBeEq(IO.just(setOf(10, 20)), IO.eq()) ++>>>>>>> origin/master } "$label - empty; put; put; put; take; take; take" { @@@ -99,8 -94,8 +115,13 @@@ f2.join().bind() f3.join().bind() ++<<<<<<< HEAD + !IO.effect { setOf(aa, bb, cc) shouldBe setOf(10, 20, 30) } + }.test() ++======= + setOf(aa, bb, cc) + }.shouldBeEq(IO.just(setOf(10, 20, 30)), IO.eq()) ++>>>>>>> origin/master } "$label - empty; take; take; take; put; put; put" { @@@ -119,8 -114,8 +140,13 @@@ val bb = f2.join().bind() val cc = f3.join().bind() ++<<<<<<< HEAD + !IO.effect { setOf(aa, bb, cc) shouldBe setOf(10, 20, 30) } + }.test() ++======= + setOf(aa, bb, cc) + }.shouldBeEq(IO.just(setOf(10, 20, 30)), IO.eq()) ++>>>>>>> origin/master } "$label - initial; isNotEmpty; take; put; take" { @@@ -132,8 -127,8 +158,13 @@@ av.put(b).bind() val r2 = av.take().bind() ++<<<<<<< HEAD + !IO.effect { Tuple3(isNotEmpty, r1, r2) shouldBe Tuple3(true, a, b) } + }.test() ++======= + Tuple3(isNotEmpty, r1, r2) + }.equalUnderTheLaw(IO.just(Tuple3(true, a, b)), IO.eq()) ++>>>>>>> origin/master } } @@@ -156,8 -151,8 +187,13 @@@ val av = mvar.just(i).bind() val read = av.read().bind() val take = av.take().bind() ++<<<<<<< HEAD + !IO.effect { (read toT take) shouldBe (i toT i) } + }.test() ++======= + read toT take + }.equalUnderTheLaw(IO.just(i toT i), IO.eq()) ++>>>>>>> origin/master } } @@@ -245,13 -240,13 +281,13 @@@ val consumerFiber = !consumer(channel, 0L).fork() !producerFiber.join() !consumerFiber.join() - }.shouldBeEq(IO.just(count * (count - 1) / 2), EQ(Long.eq())) + }.shouldBeEq(IO.just(count * (count - 1) / 2), IO.eq(Long.eq())) } - fun testStackSequential(channel: MVar<ForIO, Int>): Tuple3<Int, IO<Int>, IO<Unit>> { + fun testStackSequential(channel: MVar<IOPartialOf<Nothing>, Int>): Tuple3<Int, IO<Nothing, Int>, IO<Nothing, Unit>> { val count = 10000 - fun readLoop(n: Int, acc: Int): IO<Int> = + fun readLoop(n: Int, acc: Int): IO<Nothing, Int> = if (n > 0) channel.read().followedBy(channel.take().flatMap { readLoop(n - 1, acc + 1) }) else IO.just(acc) @@@ -341,9 -336,10 +377,16 @@@ !IO.sleep(100.milliseconds) // Give read callback a chance to register !fiber.cancel() !mVar.put(10) ++<<<<<<< HEAD + val fallback = IO.sleep(200.milliseconds).followedBy(IO.just(0)) + !IO.raceN(finished.get(), fallback) + }.shouldBeEq(IO.just(Right(0)), EQ()) ++======= + val fallback = sleep(200.milliseconds).followedBy(IO.just(0)) + val res = !IO.raceN(finished.get(), fallback) + !effect { res shouldBe Right(0) } + }.shouldBeEq(IO.unit, IO.eq()) ++>>>>>>> origin/master } } diff --cc arrow-fx/src/test/kotlin/arrow/fx/QueueTest.kt index 5797c3f,94626e6..0000000 --- a/arrow-fx/src/test/kotlin/arrow/fx/QueueTest.kt +++ b/arrow-fx/src/test/kotlin/arrow/fx/QueueTest.kt @@@ -4,21 -5,20 +5,30 @@@ import arrow.core.Non import arrow.core.Some import arrow.core.Tuple2 import arrow.core.Tuple3 - import arrow.core.Left + import arrow.core.Right import arrow.core.extensions.list.traverse.traverse + import arrow.core.extensions.nonemptylist.traverse.traverse import arrow.core.fix + import arrow.core.test.UnitSpec + import arrow.core.test.generators.nonEmptyList + import arrow.core.test.generators.tuple2 + import arrow.core.test.generators.tuple3 + import arrow.fx.test.laws.equalUnderTheLaw import arrow.fx.extensions.fx +import arrow.fx.extensions.io.apply.mapN import arrow.fx.extensions.io.applicative.applicative import arrow.fx.extensions.io.concurrent.concurrent import arrow.fx.extensions.io.dispatchers.dispatchers +import arrow.fx.extensions.io.monad.map import arrow.fx.typeclasses.milliseconds ++<<<<<<< HEAD +import arrow.test.UnitSpec +import arrow.test.generators.nonEmptyList +import arrow.test.generators.tuple2 +import arrow.test.generators.tuple3 +import arrow.test.laws.equalUnderTheLaw ++======= ++>>>>>>> origin/master import io.kotlintest.fail import io.kotlintest.matchers.types.shouldBeInstanceOf import io.kotlintest.properties.Gen @@@ -41,40 -37,105 +50,135 @@@ class QueueTest : UnitSpec() "$label - make a queue the add values then retrieve in the same order" { forAll(Gen.nonEmptyList(Gen.int())) { l -> - IO.fx { + IO.fx<Nothing, Unit> { val q = !queue(l.size) ++<<<<<<< HEAD + !l.traverse(IO.applicative<Nothing>()) { q.offer(it) } + val nl = !(1..l.size).toList().traverse(IO.applicative<Nothing>()) { q.take() } + !IO.effect { nl == l.toList() } + }.test() + } + } + + "$label - offer and take a number of values in the same order" { + forAll(Gen.tuple3(Gen.int(), Gen.int(), Gen.int())) { t -> + IO.fx<Nothing, Unit> { + val q = !queue(3) + !q.offer(t.a) + !q.offer(t.b) + !q.offer(t.c) + val first = !q.take() + val second = !q.take() + val third = !q.take() + !IO.effect { Tuple3(first, second, third) shouldBe t } + }.test() ++======= + !l.traverse(IO.applicative(), q::offer) + !(1..l.size).toList().traverse(IO.applicative()) { q.take() } + }.equalUnderTheLaw(IO.just(l.toList())) + } + } + + "$label - queue can be filled at once with enough capacity" { + forAll(Gen.nonEmptyList(Gen.int())) { l -> + IO.fx { + val q = !queue(l.size) + val succeed = !q.tryOfferAll(l.toList()) + val res = !q.takeAll() + Tuple2(succeed, res) + }.equalUnderTheLaw(IO.just(Tuple2(true, l.toList()))) + } + } + + "$label - queue can be filled at once over capacity with takers" { + forAll(Gen.nonEmptyList(Gen.int())) { l -> + IO.fx { + val q = !queue(l.size) + val (join, _) = !q.take().fork() + !IO.sleep(50.milliseconds) // Registered first, should receive first element of `tryOfferAll` + + val succeed = !q.tryOfferAll(listOf(500) + l.toList()) + val res = !q.takeAll() + val head = !join + Tuple3(succeed, res, head) + }.equalUnderTheLaw(IO.just(Tuple3(true, l.toList(), 500))) + } + } + + "$label - tryOfferAll under capacity" { + forAll( + Gen.list(Gen.int()).filter { it.size <= 100 }, + Gen.int().filter { it > 100 } + ) { l, capacity -> + IO.fx { + val q = !queue(capacity) + val succeed = !q.tryOfferAll(l) + val all = !q.takeAll() + Tuple2(succeed, all) + }.equalUnderTheLaw(IO.just(Tuple2(true, l))) + } + } + + "$label - takeAll takes all values from a Queue" { + forAll(Gen.nonEmptyList(Gen.int())) { l -> + IO.fx { + val q = !queue(l.size) + !l.traverse(IO.applicative(), q::offer) + val res = !q.takeAll() + val after = !q.takeAll() + Tuple2(res, after) + }.equalUnderTheLaw(IO.just(Tuple2(l.toList(), emptyList()))) + } + } + + "$label - peekAll reads all values from a Queue without removing them" { + forAll(Gen.nonEmptyList(Gen.int())) { l -> + IO.fx { + val q = !queue(l.size) + !l.traverse(IO.applicative(), q::offer) + val res = !q.peekAll() + val after = !q.peekAll() + Tuple2(res, after) + }.equalUnderTheLaw(IO.just(Tuple2(l.toList(), l.toList()))) + } + } + + "$label - empty queue takeAll is empty" { + forAll(Gen.positiveIntegers()) { capacity -> + IO.fx { + val q = !queue(capacity) + !q.takeAll() + }.equalUnderTheLaw(IO.just(emptyList())) + } + } + + "$label - empty queue peekAll is empty" { + forAll(Gen.positiveIntegers()) { capacity -> + IO.fx { + val q = !queue(capacity) + !q.peekAll() + }.equalUnderTheLaw(IO.just(emptyList())) ++>>>>>>> origin/master } } "$label - time out taking from an empty queue" { - IO.fx { - val wontComplete = queue(10).flatMap(Queue<ForIO, Int>::take) - val start = !effect { System.currentTimeMillis() } + IO.fx<Nothing, Unit> { + val wontComplete = queue(10).flatMap(Queue<IOPartialOf<Nothing>, Int>::take) + val start = !IO.effect { System.currentTimeMillis() } val received = !wontComplete.map { Some(it) } ++<<<<<<< HEAD + .waitFor(100.milliseconds, default = IO.just(None)) + val elapsed = !IO.effect { System.currentTimeMillis() - start } + !IO.effect { received shouldBe None } + !IO.effect { (elapsed >= 100) shouldBe true } + }.test() ++======= + .waitFor(100.milliseconds, default = just(None)) + val elapsed = !effect { System.currentTimeMillis() - start } + Tuple2(received, (elapsed >= 100)) + }.equalUnderTheLaw(IO.just(Tuple2(None, true))) ++>>>>>>> origin/master } "$label - suspended take calls on an empty queue complete when offer calls made to queue" { @@@ -83,9 -144,8 +187,14 @@@ val q = !queue(3) val first = !q.take().fork(ctx) !q.offer(i) ++<<<<<<< HEAD + val res = !first.join() + !IO.effect { res shouldBe i } + }.test() ++======= + !first.join() + }.equalUnderTheLaw(IO.just(i)) ++>>>>>>> origin/master } } @@@ -102,73 -162,186 +211,236 @@@ val firstValue = !first.join() val secondValue = !second.join() val thirdValue = !third.join() ++<<<<<<< HEAD + !IO.effect { + setOf(firstValue, secondValue, thirdValue) shouldBe setOf(t.a, t.b, t.c) + } + }.test() ++======= + setOf(firstValue, secondValue, thirdValue) + }.equalUnderTheLaw(IO.just(setOf(t.a, t.b, t.c))) } } - "$label - taking from a shutdown queue creates a QueueShutdown error" { + "$label - time out peeking from an empty queue" { + IO.fx { + val wontComplete = queue(10).flatMap(Queue<ForIO, Int>::peek) + val start = !effect { System.currentTimeMillis() } + val received = !wontComplete.map { Some(it) } + .waitFor(100.milliseconds, default = just(None)) + val elapsed = !effect { System.currentTimeMillis() - start } + Tuple2(received, (elapsed >= 100)) + }.equalUnderTheLaw(IO.just(Tuple2(None, true))) + } + + "$label - suspended peek calls on an empty queue complete when offer calls made to queue" { forAll(Gen.int()) { i -> + IO.fx { + val q = !queue(3) + val first = !q.peek().fork(ctx) + !q.offer(i) + !first.join() + }.equalUnderTheLaw(IO.just(i)) ++>>>>>>> origin/master + } + } + + "$label - multiple peek calls offerAll is cancelable an empty queue all complete with the first value is received" { + forAll(Gen.int()) { i -> ++<<<<<<< HEAD + IO.fx<Nothing, Unit> { + val q = !queue(10) ++======= + IO.fx { + val q = !queue(1) + val first = !q.peek().fork(ctx) + val second = !q.peek().fork(ctx) + val third = !q.peek().fork(ctx) ++>>>>>>> origin/master !q.offer(i) - !q.shutdown() - !q.take() - }.attempt().unsafeRunSync() == Left(QueueShutdown) + val firstValue = !first.join() + val secondValue = !second.join() + val thirdValue = !third.join() + setOf(firstValue, secondValue, thirdValue) + }.equalUnderTheLaw(IO.just(setOf(i, i, i))) } } - "$label - offering to a shutdown queue creates a QueueShutdown error" { + "$label - peek does not remove value from Queue" { forAll(Gen.int()) { i -> - IO.fx { + IO.fx<Nothing, Unit> { val q = !queue(10) - !q.shutdown() !q.offer(i) - }.attempt().unsafeRunSync() == Left(QueueShutdown) + val peeked = !q.peek() + val took = !q.takeAll() + Tuple2(peeked, took) + }.equalUnderTheLaw(IO.just(Tuple2(i, listOf(i)))) } } ++<<<<<<< HEAD + "$label - joining a forked, incomplete take call on a shutdown queue creates a QueueShutdown error" { + IO.fx<Nothing, Unit> { ++======= + "$label - tryTake on an empty Queue returns None" { + IO.fx { ++>>>>>>> origin/master val q = !queue(10) - val t = !q.take().fork(ctx) - !q.shutdown() - !t.join() - }.attempt().unsafeRunSync() shouldBe Left(QueueShutdown) + !q.tryTake() + }.equalUnderTheLaw(IO.just(None)) } ++<<<<<<< HEAD + "$label - create a shutdown hook completing a promise, then shutdown the queue, the promise should be completed" { + IO.fx<Nothing, Unit> { + val q = !queue(10) + val p = !Promise<IOPartialOf<Nothing>, Boolean>(IO.concurrent<Nothing>()) + !(q.awaitShutdown().followedBy(p.complete(true))).fork() + !q.shutdown() + !p.get() + }.test() + } + + "$label - create a shutdown hook completing a promise twice, then shutdown the queue, both promises should be completed" { + IO.fx<Nothing, Unit> { + val q = !queue(10) + val p1 = !Promise<IOPartialOf<Nothing>, Boolean>(IO.concurrent<Nothing>()) + val p2 = !Promise<IOPartialOf<Nothing>, Boolean>(IO.concurrent<Nothing>()) + !(q.awaitShutdown().followedBy(p1.complete(true))).fork() + !(q.awaitShutdown().followedBy(p2.complete(true))).fork() + !q.shutdown() + !mapN(p1.get(), p2.get()) { (p1, p2) -> p1 && p2 } + }.test() + } + + "$label - shut it down, create a shutdown hook completing a promise, the promise should be completed immediately" { + IO.fx<Nothing, Unit> { + val q = !queue(10) + !q.shutdown() + val p = !Promise<IOPartialOf<Nothing>, Boolean>(IO.concurrent<Nothing>()) + !(q.awaitShutdown().followedBy(p.complete(true))).fork() + !p.get() + }.test() ++======= + "$label - tryPeek on an empty Queue returns None" { + IO.fx { + val q = !queue(10) + !q.tryPeek() + }.equalUnderTheLaw(IO.just(None)) + } + + "$label - take is cancelable" { + IO.fx { + val q = !queue(1) + val t1 = !q.take().fork() + val t2 = !q.take().fork() + val t3 = !q.take().fork() + !IO.sleep(50.milliseconds) // Give take callbacks a chance to register + !t2.cancel() + !q.offer(1) + !q.offer(3) + val r1 = !t1.join() + val r3 = !t3.join() + val size = !q.size() + Tuple2(setOf(r1, r3), size) + }.equalUnderTheLaw(IO.just(Tuple2(setOf(1, 3), 0))) + } + + "$label - peek is cancelable" { + IO.fx { + val q = !queue(1) + val finished = !Promise<Int>() + val fiber = !q.peek().flatMap(finished::complete).fork() + !IO.sleep(50.milliseconds) // Give read callback a chance to register + !fiber.cancel() + !q.offer(10) + val fallback = sleep(200.milliseconds).followedBy(IO.just(0)) + !IO.raceN(finished.get(), fallback) + }.equalUnderTheLaw(IO.just(Right(0))) + } + + "$label - takeAll returns emptyList with waiting suspended takers" { + IO.fx { + val q = !queue(1) + val (_, cancel) = !q.take().fork() + val res = !q.takeAll() + !cancel + res + }.equalUnderTheLaw(IO.just(emptyList())) + } + + "$label - peekAll returns emptyList with waiting suspended takers" { + IO.fx { + val q = !queue(1) + val (_, cancel) = !q.take().fork() + val res = !q.peekAll() + !cancel + res + }.equalUnderTheLaw(IO.just(emptyList())) + } + + "$label - offerAll offers all values with waiting suspended takers, and within capacity" { + forAll(50, + Gen.nonEmptyList(Gen.int()).filter { it.size in 1..50 }, + Gen.choose(52, 100) + ) { l, capacity -> + IO.fx { + val q = !queue(capacity) + val (_, cancel) = !q.take().fork() + !IO.sleep(50.milliseconds) // Give take callbacks a chance to register + + !q.offerAll(l.toList()) + !cancel + !q.peekAll() + }.equalUnderTheLaw(IO.just(l.toList().drop(1))) + } + } + + "$label - offerAll can offer empty" { + IO.fx { + val q = !queue(1) + !q.offer(1) + !q.offerAll(emptyList()) + !q.peekAll() + }.equalUnderTheLaw(IO.just(listOf(1))) + } + } + + fun strategyAtCapacityTests( + label: String, + ctx: CoroutineContext = IO.dispatchers().default(), + queue: (Int) -> IO<Queue<ForIO, Int>> + ) { + "$label - tryOffer returns false over capacity" { + IO.fx { + val q = !queue(1) + !q.offer(1) + !q.tryOffer(2) + }.equalUnderTheLaw(IO.just(false)) + } + + "$label - tryOfferAll over capacity" { + forAll(Gen.list(Gen.int()).filter { it.size > 1 }) { l -> + IO.fx { + val q = !queue(1) + val succeed = !q.tryOfferAll(l) + val res = !q.peekAll() + Tuple2(succeed, res) + }.equalUnderTheLaw(IO.just(Tuple2(false, emptyList()))) + } + } + + "$label - can take and offer at capacity" { + IO.fx { + val q = !queue(1) + val (join, _) = !q.take().fork() + !IO.sleep(50.milliseconds) + val succeed = !q.tryOfferAll(1, 2) + val a = !q.take() + val b = !join + Tuple2(succeed, setOf(a, b)) + }.equalUnderTheLaw(IO.just(Tuple2(true, setOf(1, 2)))) ++>>>>>>> origin/master } } @@@ -178,72 -351,205 +450,242 @@@ ) { val label = "BoundedQueue" allStrategyTests(label, ctx, queue) + strategyAtCapacityTests(label, ctx, queue) "$label - time out offering to a queue at capacity" { - IO.fx { + IO.fx<Nothing, Unit> { val q = !queue(1) !q.offer(1) - val start = !effect { System.currentTimeMillis() } + val start = !IO.effect { System.currentTimeMillis() } val wontComplete = q.offer(2) val received = !wontComplete.map { Some(it) } ++<<<<<<< HEAD + .waitFor(100.milliseconds, default = IO.just(None)) + val elapsed = !IO.effect { System.currentTimeMillis() - start } + !IO.effect { received shouldBe None } + !IO.effect { (elapsed >= 100) shouldBe true } + }.test() ++======= + .waitFor(100.milliseconds, default = just(None)) + val elapsed = !effect { System.currentTimeMillis() - start } + Tuple2(received, (elapsed >= 100)) + }.equalUnderTheLaw(IO.just(Tuple2(None, true))) + } + + "$label - time out offering multiple values to a queue at capacity" { + IO.fx { + val q = !queue(3) + val start = !effect { System.currentTimeMillis() } + val wontComplete = q.offerAll(1, 2, 3, 4) + val received = !wontComplete.map { Some(it) } + .waitFor(100.milliseconds, default = just(None)) + val elapsed = !effect { System.currentTimeMillis() - start } + Tuple2(received, (elapsed >= 100)) + }.equalUnderTheLaw(IO.just(Tuple2(None, true))) + } + + "$label - queue cannot be filled at once without enough capacity" { + forAll(Gen.nonEmptyList(Gen.int())) { l -> + IO.fx { + val q = !queue(l.size) + val succeed = !q.tryOfferAll(l.toList() + 1) + val res = !q.takeAll() + Tuple2(succeed, res) + }.equalUnderTheLaw(IO.just(Tuple2(false, emptyList()))) + } + } + + "$label - can offerAll at capacity with take" { + IO.fx { + val q = !queue(1) + val (join, _) = !q.take().fork() + !IO.sleep(50.milliseconds) + !q.offerAll(1, 2) + val a = !q.take() + val b = !join + setOf(a, b) + }.equalUnderTheLaw(IO.just(setOf(1, 2))) + } + + "$label - can tryOfferAll at capacity with take" { + IO.fx { + val q = !queue(1) + val (join, _) = !q.take().fork() + !IO.sleep(50.milliseconds) + val succeed = !q.tryOfferAll(1, 2) + val a = !q.take() + val b = !join + Tuple2(succeed, setOf(a, b)) + }.equalUnderTheLaw(IO.just(Tuple2(true, setOf(1, 2)))) + } + + // offerAll(fa).fork() + offerAll(fb).fork() <==> queue(fa + fb) OR queue(fb + fa) + "$label - offerAll is atomic" { + forAll(Gen.nonEmptyList(Gen.int()), Gen.nonEmptyList(Gen.int())) { fa, fb -> + IO.fx { + val q = !queue(fa.size + fb.size) + !q.offerAll(fa.toList()).fork() + !q.offerAll(fb.toList()).fork() + + !IO.sleep(50.milliseconds) + + val res = !q.takeAll() + res == (fa.toList() + fb.toList()) || res == (fb.toList() + fa.toList()) + }.equalUnderTheLaw(IO.just(true)) + } + } + + // To test outstanding offers, we need to `offer` more elements to the queue than we have capacity + "$label - takeAll takes all values, including outstanding offers" { + forAll(50, + Gen.nonEmptyList(Gen.int()).filter { it.size in 51..100 }, + Gen.choose(1, 50) + ) { l, capacity -> + IO.fx { + val q = !queue(capacity) + !l.parTraverse(NonEmptyList.traverse(), q::offer).fork() + !IO.sleep(50.milliseconds) // Give take callbacks a chance to register + + val res = !q.takeAll().map(Iterable<Int>::toSet) + val after = !q.peekAll() + Tuple2(res, after) + }.equalUnderTheLaw(IO.just(Tuple2(l.toList().toSet(), emptyList()))) + } + } + + // To test outstanding offers, we need to `offer` more elements to the queue than we have capacity + "$label - peekAll reads all values, including outstanding offers" { + forAll(50, + Gen.nonEmptyList(Gen.int()).filter { it.size in 51..100 }, + Gen.choose(1, 50) + ) { l, capacity -> + IO.fx { + val q = !queue(capacity) + !l.parTraverse(NonEmptyList.traverse(), q::offer).fork() + !IO.sleep(50.milliseconds) // Give take callbacks a chance to register + + val res = !q.peekAll().map(Iterable<Int>::toSet) + val after = !q.peekAll().map(Iterable<Int>::toSet) + Tuple2(res, after) + }.equalUnderTheLaw(IO.just(Tuple2(l.toList().toSet(), l.toList().toSet()))) + } + } + + // Offer only gets scheduled for Bounded Queues, others apply strategy. + "$label - offer is cancelable" { + IO.fx { + val q = !queue(1) + !q.offer(0) + !q.offer(1).fork() + val p2 = !q.offer(2).fork() + !q.offer(3).fork() + + !IO.sleep(50.milliseconds) // Give put callbacks a chance to register + + !p2.cancel() + + !q.take() + val r1 = !q.take() + val r3 = !q.take() + + setOf(r1, r3) + }.equalUnderTheLaw(IO.just(setOf(1, 3))) + } + + // OfferAll only gets scheduled for Bounded Queues, others apply strategy. + "$label - offerAll is cancelable" { + IO.fx { + val q = !queue(1) + !q.offer(0) + !q.offer(1).fork() + val p2 = !q.offerAll(2, 3).fork() + !q.offer(4).fork() + + !IO.sleep(50.milliseconds) // Give put callbacks a chance to register + + !p2.cancel() + + !q.take() + val r1 = !q.take() + val r3 = !q.take() + + setOf(r1, r3) + }.equalUnderTheLaw(IO.just(setOf(1, 4))) + } + + "$label - tryOffer returns false at capacity" { + IO.fx { + val q = !queue(1) + !q.offer(1) + !q.tryOffer(2) + }.equalUnderTheLaw(IO.just(false)) ++>>>>>>> origin/master } "$label - capacity must be a positive integer" { - queue(0).attempt().suspended().fold( - { err -> err.shouldBeInstanceOf<IllegalArgumentException>() }, - { fail("Expected Left<IllegalArgumentException>") } - ) + IO.fx<Nothing, Unit> { + val attempted = !queue(0).attempt() + !IO.effect { + attempted.fold( + { err -> err.shouldBeInstanceOf<IllegalArgumentException>() }, + { fail("Expected Left<IllegalArgumentException>") } + ) + } + }.test() } - "$label - suspended offers called on an full queue complete when take calls made to queue" { + "$label - suspended offers called on a full queue complete when take calls made to queue" { forAll(Gen.tuple2(Gen.int(), Gen.int())) { t -> - IO.fx { + IO.fx<Nothing, Unit> { val q = !queue(1) !q.offer(t.a) - !q.offer(t.b).fork(ctx) + val (join, _) = !q.offer(t.b).fork(ctx) val first = !q.take() val second = !q.take() ++<<<<<<< HEAD + !IO.effect { Tuple2(first, second) shouldBe t } + }.test() ++======= + !join // Check if fiber completed + Tuple2(first, second) + }.equalUnderTheLaw(IO.just(t)) ++>>>>>>> origin/master } } "$label - multiple offer calls on an full queue complete when as many take calls are made to queue" { forAll(Gen.tuple3(Gen.int(), Gen.int(), Gen.int())) { t -> - IO.fx { + IO.fx<Nothing, Unit> { val q = !queue(1) !q.offer(t.a) - !q.offer(t.b).fork(ctx) - !q.offer(t.c).fork(ctx) + val (join, _) = !q.offer(t.b).fork(ctx) + val (join2, _) = !q.offer(t.c).fork(ctx) val first = !q.take() val second = !q.take() val third = !q.take() ++<<<<<<< HEAD + !IO.effect { + setOf(first, second, third) shouldBe setOf(t.a, t.b, t.c) + } + }.test() + } + } + + "$label - joining a forked offer call made to a shut down queue creates a QueueShutdown error" { + forAll(Gen.int()) { i -> + IO.fx<Nothing, Unit> { + val q = !queue(1) + !q.offer(i) + val o = !q.offer(i).fork(ctx) + !q.shutdown() + !o.join() + }.attempt().unsafeRunSync() == Left(QueueShutdown) ++======= + !join // Check if fiber completed + !join2 // Check if fiber completed + setOf(first, second, third) + }.equalUnderTheLaw(IO.just(setOf(t.a, t.b, t.c))) ++>>>>>>> origin/master } } } @@@ -254,67 -560,68 +696,99 @@@ ) { val label = "SlidingQueue" allStrategyTests(label, ctx, queue) + strategyAtCapacityTests(label, ctx, queue) "$label - capacity must be a positive integer" { - queue(0).attempt().suspended().fold( - { err -> err.shouldBeInstanceOf<IllegalArgumentException>() }, - { fail("Expected Left<IllegalArgumentException>") } - ) - } - + IO.fx<Nothing, Unit> { + val attempted = !queue(0).attempt() + !IO.effect { + attempted.fold( + { err -> err.shouldBeInstanceOf<IllegalArgumentException>() }, + { fail("Expected Left<IllegalArgumentException>") } + ) + } + }.test() + } + ++<<<<<<< HEAD + "$label - removes first element after offering to a queue at capacity" { + forAll(Gen.int(), Gen.nonEmptyList(Gen.int())) { x, xs -> + IO.fx<Nothing, Unit> { + val q = !queue(xs.size) + !q.offer(x) + !xs.traverse(IO.applicative<Nothing>(), q::offer) + val taken = !(1..xs.size).toList().traverse(IO.applicative<Nothing>()) { q.take() } + !IO.effect { + taken shouldBe xs.toList() + } + }.test() ++======= + "$label - slides elements offered to queue at capacity" { + forAll( + Gen.choose(1, 50), + Gen.nonEmptyList(Gen.int()).filter { it.size > 50 } + ) { capacity, xs -> + IO.fx { + val q = !queue(capacity) + !q.offerAll(xs.toList()) + !q.peekAll() + }.equalUnderTheLaw(IO.just(xs.toList().drop(xs.size - capacity))) ++>>>>>>> origin/master } } } fun droppingStrategyTests( - ctx: CoroutineContext = IO.dispatchers().default(), - queue: (Int) -> IO<Queue<ForIO, Int>> + ctx: CoroutineContext = IO.dispatchers<Nothing>().default(), + queue: (Int) -> IO<Nothing, Queue<IOPartialOf<Nothing>, Int>> ) { val label = "DroppingQueue" - allStrategyTests(label, ctx, queue) + strategyAtCapacityTests(label, ctx, queue) "$label - capacity must be a positive integer" { - queue(0).attempt().suspended().fold( - { err -> err.shouldBeInstanceOf<IllegalArgumentException>() }, - { fail("Expected Left<IllegalArgumentException>") } - ) + IO.fx<Nothing, Unit> { + val attempted = !queue(0).attempt() + !IO.effect { + attempted.fold( + { err -> err.shouldBeInstanceOf<IllegalArgumentException>() }, + { fail("Expected Left<IllegalArgumentException>") } + ) + } + }.test() } "$label - drops elements offered to a queue at capacity" { forAll(Gen.int(), Gen.int(), Gen.nonEmptyList(Gen.int())) { x, x2, xs -> - IO.fx { + IO.fx<Nothing, Unit> { val q = !queue(xs.size) - !xs.traverse(IO.applicative(), q::offer) + !xs.traverse(IO.applicative<Nothing>()) { q.offer(it) } !q.offer(x) // this `x` should be dropped - val taken = !(1..xs.size).toList().traverse(IO.applicative()) { q.take() } + val taken = !(1..xs.size).toList().traverse(IO.applicative<Nothing>()) { q.take() } !q.offer(x2) val taken2 = !q.take() ++<<<<<<< HEAD + !IO.effect { + taken.fix() + taken2 shouldBe xs.toList() + x2 + } + }.test() ++======= + taken.fix() + taken2 + }.equalUnderTheLaw(IO.just(xs.toList() + x2)) + } + } + + "$label - drops elements offered to queue at capacity" { + forAll( + Gen.choose(1, 50), + Gen.nonEmptyList(Gen.int()).filter { it.size > 50 } + ) { capacity, xs -> + IO.fx { + val q = !queue(capacity) + !q.offerAll(xs.toList()) + !q.peekAll() + }.equalUnderTheLaw(IO.just(xs.toList().take(capacity))) ++>>>>>>> origin/master } } } diff --cc arrow-fx/src/test/kotlin/arrow/fx/ResourceTest.kt index 7d54842,b00dfd4..0000000 --- a/arrow-fx/src/test/kotlin/arrow/fx/ResourceTest.kt +++ b/arrow-fx/src/test/kotlin/arrow/fx/ResourceTest.kt @@@ -26,10 -25,10 +26,10 @@@ import io.kotlintest.properties.Ge class ResourceTest : UnitSpec() { init { - val EQ = Resource.eqK().liftEq(Int.eq()) + val EQ: Eq<Kind<Kind<Kind<ForResource, ForIO>, Throwable>, Int>> = Resource.eqK().liftEq(Int.eq()) testLaws( - MonadLaws.laws( + MonadLaws.laws<ResourcePartialOf<IOPartialOf<Nothing>, Throwable>>( Resource.monad(IO.bracket()), Resource.functor(IO.bracket()), Resource.applicative(IO.bracket()), @@@ -53,8 -52,8 +53,13 @@@ } } ++<<<<<<< HEAD +private fun Resource.Companion.eqK() = object : EqK<ResourcePartialOf<IOPartialOf<Nothing>, Throwable>> { + override fun <A> Kind<ResourcePartialOf<IOPartialOf<Nothing>, Throwable>, A>.eqK(other: Kind<ResourcePartialOf<IOPartialOf<Nothing>, Throwable>, A>, EQ: Eq<A>): Boolean = ++======= + fun Resource.Companion.eqK() = object : EqK<ResourcePartialOf<ForIO, Throwable>> { + override fun <A> Kind<ResourcePartialOf<ForIO, Throwable>, A>.eqK(other: Kind<ResourcePartialOf<ForIO, Throwable>, A>, EQ: Eq<A>): Boolean = ++>>>>>>> origin/master (this.fix() to other.fix()).let { val ls = it.first.use(IO.Companion::just).fix().attempt() val rs = it.second.use(IO.Companion::just).fix().attempt() @@@ -64,8 -63,8 +69,13 @@@ } } ++<<<<<<< HEAD +private fun Resource.Companion.genK() = object : GenK<ResourcePartialOf<IOPartialOf<Nothing>, Throwable>> { + override fun <A> genK(gen: Gen<A>): Gen<Kind<ResourcePartialOf<IOPartialOf<Nothing>, Throwable>, A>> { ++======= + fun Resource.Companion.genK() = object : GenK<ResourcePartialOf<ForIO, Throwable>> { + override fun <A> genK(gen: Gen<A>): Gen<Kind<ResourcePartialOf<ForIO, Throwable>, A>> { ++>>>>>>> origin/master val allocate = gen.map { Resource({ IO.just(it) }, { _ -> IO.unit }, IO.bracket()) } return Gen.oneOf( diff --cc arrow-fx/src/test/kotlin/arrow/fx/SemaphoreTest.kt index cfd069d,0512a81..0000000 --- a/arrow-fx/src/test/kotlin/arrow/fx/SemaphoreTest.kt +++ b/arrow-fx/src/test/kotlin/arrow/fx/SemaphoreTest.kt @@@ -135,12 -136,12 +136,19 @@@ class SemaphoreTest : UnitSpec() } tests("UncancellableSemaphore") { Semaphore.uncancellable(it, IO.async()) } - tests("cancellableSemaphore") { Semaphore(it, IO.concurrent()) } - + tests("CancellableSemaphore") { Semaphore(it, IO.concurrent()) } + ++<<<<<<< HEAD + "CancellableSemaphore - supports cancellation of acquire" { + Semaphore(0, IO.concurrent<Nothing>()).flatMap { s -> + s.acquire() + }.unsafeRunAsyncCancellable { } ++======= + "cancellableSemaphore - supports cancellation of acquire" { + Semaphore(0, IO.concurrent()).flatMap { s -> + s.acquire() + }.unsafeRunAsyncCancellable { } ++>>>>>>> origin/master .invoke() } } diff --cc arrow-fx/src/test/kotlin/arrow/fx/predef.kt index 26553b1,e09fef0..0000000 --- a/arrow-fx/src/test/kotlin/arrow/fx/predef.kt +++ b/arrow-fx/src/test/kotlin/arrow/fx/predef.kt @@@ -1,12 -1,5 +1,15 @@@ package arrow.fx ++<<<<<<< HEAD +import arrow.Kind +import arrow.fx.extensions.io.applicative.applicative +import arrow.fx.extensions.io.concurrent.waitFor +import arrow.fx.typeclasses.Duration +import arrow.fx.typeclasses.seconds +import arrow.test.eq +import arrow.typeclasses.Eq ++======= ++>>>>>>> origin/master import kotlinx.atomicfu.atomic import java.util.concurrent.ExecutorService import java.util.concurrent.SynchronousQueue @@@ -14,14 -7,6 +17,17 @@@ import java.util.concurrent.ThreadFacto import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit ++<<<<<<< HEAD +fun <E, A> EQ(EQA: Eq<A> = Eq.any(), timeout: Duration = 5.seconds): Eq<Kind<IOPartialOf<E>, A>> = Eq { a, b -> + IOResult.eq(Eq.any(), EQA, Eq.any()).run { + IO.applicative<Nothing>().mapN(a.fix().result(), b.fix().result()) { (a, b) -> a.eqv(b) } + .waitFor(timeout) + .unsafeRunSync() + } +} + ++======= ++>>>>>>> origin/master /** * This [ExecutorService] doesn't keep any Thread alive, so the maximumPoolSize should be equal to the # of scheduled tasks. * diff --cc arrow-streams/src/test/kotlin/arrow/streams/internal/FreeCTest.kt index 0d68eb2,42f6c85..0000000 --- a/arrow-streams/src/test/kotlin/arrow/streams/internal/FreeCTest.kt +++ b/arrow-streams/src/test/kotlin/arrow/streams/internal/FreeCTest.kt @@@ -24,16 -21,22 +24,26 @@@ import arrow.core.fi import arrow.core.identity import arrow.core.right import arrow.core.some -import arrow.fx.ForIO -import arrow.fx.IO +import arrow.fx.IOPartialOf +import arrow.fx.unsafeRunSync import arrow.fx.extensions.io.monadError.monadError -import arrow.fx.fix import arrow.higherkind -import arrow.streams.internal.freec.applicative.applicative import arrow.streams.internal.freec.eq.eq -import arrow.streams.internal.freec.functor.functor import arrow.streams.internal.freec.monad.monad ++<<<<<<< HEAD +import arrow.test.UnitSpec +import arrow.test.generators.GenK +import arrow.test.generators.functionAToB +import arrow.test.generators.throwable ++======= + import arrow.streams.internal.freec.monadDefer.monadDefer + import arrow.core.test.UnitSpec + import arrow.core.test.generators.GenK + import arrow.core.test.generators.functionAToB + import arrow.core.test.generators.throwable + import arrow.core.test.laws.EqLaws + import arrow.fx.test.laws.MonadDeferLaws ++>>>>>>> origin/master import arrow.typeclasses.Eq import arrow.typeclasses.EqK import io.kotlintest.properties.Gen * Unmerged path arrow-fx-test/src/main/kotlin/arrow/test/laws/ApplicativeErrorLaws.kt * Unmerged path arrow-fx-test/src/main/kotlin/arrow/test/laws/TraverseLaws.kt
Duplicated #128
Error log
https://github.com/arrow-kt/arrow-fx/commit/07677ccd5a2cb3127af9cb5d9e29e105e19a7529/checks
Conflicts