ScalaConsultants / reactive-rabbit

Reactive Streams driver for AMQP protocol. Powered by RabbitMQ library.
Apache License 2.0
184 stars 40 forks source link

Issue working with Akka Streams TestProbe #22

Closed gzoller closed 8 years ago

gzoller commented 8 years ago

Hello, I'm having some trouble using reactive-rabbit with TestSink in Akka Streams testkit.

Code:

                conn.queueDeclare( Queue("testQ") )
                val msg    = Message(ByteString("message!"))
                val source = Source(List(msg))
                val sink   = Sink(conn.publishDirectly(queue = "testQ"))

                // THIS TIMES OUT!
                (source to sink).run()
                val q = conn.consume(queue="testQ")
                Source(q).map( _.message.body.utf8String )
                    .runWith(TestSink.probe[String])
                    .request(1)
                    .expectNext("message!")
                    .expectComplete()

                // THIS WORKS.
                Source(List("message!")).map( m => m )
                    .runWith(TestSink.probe[String])
                    .request(1)
                    .expectNext("message!")
                    .expectComplete()

The top Source example blows up with a timeout...basically it seems nothing is coming out the end of the pipe. The simple String version works fine. If I add a println in the .map() I can confirm this code is being called and that the utf8String looks well-formed as expected.

Why isn't this playing well with TestSink?

Also... I noticed there's a Confirm object in this code base that doesn't seem to be used anyplace. Is it obsolete or an unfinished feature?

Thanks! Greg

ktoso commented 8 years ago

The first test, which blows up, is wrong. You must signal more than 1 demand in order to get the completion event @gzoller. Dropping down to manually controlling demand gives you a lot of flexibility, but then you're exposed to fetching strategies of all elements of the stream - instead you can expect a given result to be signalled by the entire stream, instead of driving it step by step.

Adding the map makes it work because it employs internal pre-fetch as explained in http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-rate.html

mkiedys commented 8 years ago

@gzoller wrote:

Also... I noticed there's a Confirm object in this code base that doesn't seem to be used anyplace. Is it obsolete or an unfinished feature?

This is part of the code that implements Processor[Confirm, Delivery]. The code is not finished. Let me know if you would like to work on it. Seems like people need something like this in order to manually control when message is confirmed.