ScalaConsultants / reactive-rabbit

Reactive Streams driver for AMQP protocol. Powered by RabbitMQ library.
Apache License 2.0
184 stars 40 forks source link

Message wrapper #39

Closed vitorsvieira closed 8 years ago

vitorsvieira commented 8 years ago

Looks like there is the need to encapsulate every value inside the Message object before sending it to Sink, but I'm not sure of that.

Can you explain a little bit more? I'm not able to build the docs through sbt, so I'm having to document the source code by myself while exploring the lib.

mkiedys commented 8 years ago

Hi @notvitor

This is an example that uses Akka Streams to wire things together:

import java.nio.charset.StandardCharsets

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import com.google.common.net.MediaType
import io.scalac.amqp.{Connection, Message}

object Demo {

  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()

  val connection = Connection()
  val subscriber = connection.publishDirectly("queue")

  val message = Message(
    body        = "Hello World".getBytes(StandardCharsets.UTF_8),
    contentType = Some(MediaType.PLAIN_TEXT_UTF_8)
  )

  val source = Source.fromIterator(() ⇒ Iterator.continually(message))

  source.runWith(Sink.fromSubscriber(subscriber))
}

Let me know if you have more questions.

vitorsvieira commented 8 years ago

Thanks @mkiedys, I've been doing something similar.

I'm planning to create some examples, maybe a blog post comparing this library with the others. By far, this one have the lowest complexity in its implementation.