Reactive Streams driver for AMQP protocol. Powered by RabbitMQ library.
Available at Maven Central for Scala 2.11 and 2.12:
libraryDependencies += "io.scalac" %% "reactive-rabbit" % "1.1.4"
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import io.scalac.amqp.Connection
// streaming invoices to Accounting Department
val connection = Connection()
// create org.reactivestreams.Publisher
val queue = connection.consume(queue = "invoices")
// create org.reactivestreams.Subscriber
val exchange = connection.publish(exchange = "accounting_department",
routingKey = "invoices")
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
// Run akka-streams with queue as Source and exchange as Sink
Source.fromPublisher(queue).map(_.message).runWith(Sink.fromSubscriber(exchange))
Run sbt doc
and open target/scala-2.12/index.html.
There are 3 options for passing AMQP settings:
Connection()
Config
programatically.
Connection(config : Config)
ConnectionSettings
programatically
Connection(settings: ConnectionSettings)
ConnectionSettings
have following properties:
addresses: Seq[Address]
broker addresses (hostname/port pairs) to try in order. A random one will be picked during recovery.virtualHost: String
virtual host to use when connecting to the broker.username: String
user name to use when connecting to the broker.password: String
password to use when connecting to the broker.heartbeat: Option[FiniteDuration]
requested heartbeat interval, at least 1 second.None
to disable heartbeat.timeout: Duration
the default connection timeout, at least 1 millisecond.automaticRecovery: Boolean
enable automatic connection recovery. Subscriptions are not recovered.recoveryInterval: FiniteDuration
how long will automatic recovery wait before attempting to reconnect.ssl: Option[String]
allows to use SSL for connecting to the broker. Valid values depend on JRE, see possiblities. Recent RabbitMQ servers does not allow SSL3.Connection trait API has two groups of methods: to manage AMQP infrastructure (ie. declare and delete exchanges, queues and bindings) and to create ReactiveStreams entities: Publisher
and Subscriber
.
consume(queue, prefetch)
- creates Delivery
stream Publisher
for messages from queue
.
publish(exchange, routingKey)
- creates Subscription
that takes stream of Message
that will be sent to exchange
with fixed routingKey
.
publish(exchange)
- creates Subscription
for stream of Routed
(tuple of Message
and routing key).
Developed by Scalac