krasserm / streamz

A combinator library for integrating Functional Streams for Scala (FS2), Akka Streams and Apache Camel
Apache License 2.0
282 stars 44 forks source link

Restructure akka-streams => fs2 DSL to prevent dropping Mat value #55

Closed Daenyth closed 4 years ago

Daenyth commented 6 years ago

I'm reasonably happy with this approach.

This version has

Still todo:

If we want to keep the single method name instead of split method names, we'll have to implement some dependent type + typeclass trickery; gitter chat about it: October 5, 2018 9:13 AM

krasserm commented 6 years ago

If we want to keep the single method name instead of split method names, we'll have to implement some dependent type

Do you mean a single method name for return types either Stream[F, A] when the materialized value is NotUsed or `F[Stream[F, A], M] (or one of the alternatives I proposed) for other materialized value types?

Daenyth commented 6 years ago

Yes, that's what I meant. It's not pretty

krasserm commented 6 years ago

This PR LGTM at a first glance, thank you very much! I'm fine with having to method names. Do you plan further work here or is it ready for a final review?

Daenyth commented 6 years ago

Documentation should have more usage of the new forms.

Do you think we need more tests?

Scalafix would be nice if we could, but not necessary.

krasserm commented 6 years ago

+1 on extending/updating the docs and more tests. Also, there's one test that permanently fails now on Travis (re-started the job several times with same result). It is related to the Akka Sink -> FS2 sink conversion.

Daenyth commented 6 years ago

BTW I did set the branch so you can push to it - feel free to make any edits in line with above

krasserm commented 6 years ago

I'm afraid I don't have the bandwidth to make any edits at the moment. I highly appreciate your contributions and am happy to review/discuss them. It would be great if you could at least write some minimal tests for the additional features you added and update the docs accordingly. Everything else can be done later.

Daenyth commented 6 years ago

@krasserm I'm hoping to wrap this up during this week.

Until then, can you publish a milestone release from master branch with the release note that this API will be changing? That will hopefully unblock anyone downstream to at least begin the fs2 1.0 upgrade

krasserm commented 6 years ago

@Daenyth what speaks against including this PR into the next milestone release? We anyway introduced breaking changes into the previous one.

Daenyth commented 6 years ago

Well at the moment I broke the tests and I need to figure out what's wrong. I'm OK merging this without documentation or extra tests for a milestone release, once that's fixed

krasserm commented 6 years ago

Ok, I'll release 0.10-M2 later this week and we plan this PR for M3.

Daenyth commented 6 years ago

Perfect, thanks!

Daenyth commented 6 years ago

The test was a silly error;

diff --git a/streamz-converter/src/test/scala/streamz/converter/ConverterSpec.scala b/streamz-converter/src/test/scala/streamz/converter/ConverterSpec.scala
index 524746e..e90c11b 100644
--- a/streamz-converter/src/test/scala/streamz/converter/ConverterSpec.scala
+++ b/streamz-converter/src/test/scala/streamz/converter/ConverterSpec.scala
@@ -133,10 +133,10 @@ class ConverterSpec extends TestKit(ActorSystem("test")) with WordSpecLike with
     "propagate cancellation from AS sink to FS2 sink (on AS sink error)" in {
       val probe = TestProbe()
       val akkaSink = AkkaSink.foreach[Int](_ => throw error)
-      akkaSink.toSinkMat[IO].map { case (snk, mat) =>
+      Stream.force(akkaSink.toSinkMat[IO].map { case (snk, mat) =>
         mat.onComplete(probe.ref ! _)
         Stream.emits(numbers).covary[IO].to(snk)
-      }.unsafeRunSync()
+      }).compile.drain.unsafeRunSync()
       probe.expectMsg(Failure(error))
     }
   }

The unsafeRunSync was returning a Stream[F, Unit], so I had to force it first.

Daenyth commented 6 years ago

Rebased and test fixed, let's see how it goes. I don't know why a camel test failed here: https://travis-ci.org/krasserm/streamz/jobs/449841904

As far as I can tell, nothing I touched should have affected that, so perhaps it's a flaky test

krasserm commented 6 years ago

Regarding the failing tests, these are flaky tests and have nothing to do with your changes.

Daenyth commented 6 years ago

Thanks for the review! Hoping to wrap this up properly soon...

Daenyth commented 5 years ago

As you may have noticed, I haven't had time for this. I don't know when I will.

I think it's worth cutting the 1.0 release version and pushing this break back to 2.0, unless someone else is willing to finish the work.

milanvdm commented 4 years ago

@Daenyth Are you still working on this branch? Since the original branch is removed, it is a bit tricky to take over and have a look at your work.

Daenyth commented 4 years ago

I haven't had time to actively work on this. If you'd like to take it over, I'd appreciate that.

The repo with the commit in it can be viewed here: https://github.com/krasserm/streamz/tree/b090704a4613e08908bb19f1b19df56f556c78ad

It should be possible to git checkout that commit after cloning.

That said, it might be more straightforward to start over from the intent due to the file divergence. It should be reasonably to follow the intent of the change based on the diff here; ping me on gitter if you'd like any more direct help with it.

Daenyth commented 4 years ago

Closing for now; if I have time to take this up again I'll start a new branch, but until then anyone else wanting to address it would be very welcome.