rabbitmq / rabbitmq-stream-rust-client

A client library for RabbitMQ streams
Other
156 stars 15 forks source link
rabbitmq rabbitmq-client rabbitmq-streams

RabbitMQ Streams Client for Rust


A work-in-progress Rust client for RabbitMQ Streams


Tests status Download docs.rs docs deps codecov

RabbitMQ Stream Client

This is a Rust client library for working with RabbitMQ Streams.

Installation

Install from crates.io

[dependencies]
rabbitmq-stream-client = "*"

Quick Start

The main access point is Environment, which is used to connect to a node.

Example

Building the environment
use rabbitmq_stream_client::Environment;
let environment = Environment::builder().build().await?;
Building the environment with TLS
use rabbitmq_stream_client::Environment;

let tls_configuration: TlsConfiguration = TlsConfiguration::builder()
.add_root_certificates(String::from(".ci/certs/ca_certificate.pem"))
.build();

// Use this configuration if you want to trust the certificates
// without providing the root certificate
let tls_configuration: TlsConfiguration = TlsConfiguration::builder()
     .trust_certificates(true)
     .build();

let environment = Environment::builder()
    .host("localhost")
    .port(5551) // specify the TLS port of the node
    .tls(tls_configuration)
    .build()
Building the environment with a load balancer
use rabbitmq_stream_client::Environment;

let environment = Environment::builder()
    .load_balancer_mode(true)
    .build()
Publishing messages
use rabbitmq_stream_client::{Environment, types::Message};
let environment = Environment::builder().build().await?;
let producer = environment.producer().name("myproducer").build("mystream").await?;
for i in 0..10 {
    producer
      .send_with_confirm(Message::builder().body(format!("message{}", i)).build())
      .await?;
}
producer.close().await?;
Consuming messages
use rabbitmq_stream_client::{Environment};
use futures::StreamExt;
use tokio::task;
use tokio::time::{sleep, Duration};
let environment = Environment::builder().build().await?;
let mut consumer = environment.consumer().build("mystream").await?;
let handle = consumer.handle();
task::spawn(async move {
        while let Some(delivery) = consumer.next().await {
            let d = delivery.unwrap();
            println!("Got message: {:#?} with offset: {}",
                     d.message().data().map(|data| String::from_utf8(data.to_vec()).unwrap()),
                     d.offset(),);
        }
    });
// wait 10 second and then close the consumer
sleep(Duration::from_secs(10)).await;
handle.close().await?;

Superstreams

The client supports the superstream functionality.

A super stream is a logical stream made of individual, regular streams. It is a way to scale out publishing and consuming with RabbitMQ Streams: a large logical stream is divided into partition streams, splitting up the storage and the traffic on several cluster nodes.

See the blog post for more info.

You can use SuperStreamProducer and SuperStreamConsumer classes which internally uses producers and consumers to operate on the componsing streams.

Have a look to the examples to see on how to work with super streams.

See the Super Stream Producer Example:

See the Super Stream Consumer Example:

Development

Compiling

git clone https://github.com/rabbitmq/rabbitmq-stream-rust-client .
make build

Running Tests

To run tests you need to have a running RabbitMQ Stream node with a TLS configuration. It is mandatory to use make rabbitmq-server to create a TLS configuration compatible with the tests. See the Environment TLS tests for more details.

make rabbitmq-server
make test

Running Benchmarks

make rabbitmq-server
make run-benchmark