eigr / Astreu

High-performance Messaging System based on a gRPC protocol
https://eigr.io/astreu
Apache License 2.0
16 stars 3 forks source link
grpc message-broker message-bus message-queue messaging network

Astreu

High-performance Messaging System based on gRPC protocol written in Elixir (this is a WIP no production ready)

Astreu CI Astreu Release

Architecture Overview


                                     +-------------------------------+
                                     |           Astreu              |
                                     |                               |
                                     | +---------------------------+ |
    +-------------+                  | |     Management API        | |                   +-------------+
    | Subscribers |                  | +---------------------------+ |                   |  Producers  |
  +-------------+ |  Bi-directional  | +---------------------------+ |  Bi-directional   | +-------------+
+---------------| +----------------->+ |     PubSub Adapters       | +------------------>+ | +-------------+
| | |          || |    Streams       | +---------------------------+ |    Streams        | | |         | | |
| | |          || +<-----------------+ +---------------------------+ +<------------------+ | |         | | |
| | +-------------+                  | |      Core Protocol        | |                   +-------------+ | |
+-+-------------+                    | +---------------------------+ |                     +-------------+ |
                                     | +---------------------------+ |                       +-------------+
                                     | |        gRpc Server        | |
                                     | +---------------------------+ |
                                     +-------------------------------+

Usage and Installation

# docker run --rm --net=host -e RELEASE_NODE=unique_name_peer_node eigr/astreu:0.1.2
public static void main(final String[] args) {

    final Producer producer =
            Astreu.at("127.0.0.1", 9980)
            .asPub("test", UUID.randomUUID().toString().toLowerCase());

    final Publisher<ReplyMessage> publisher = producer.bind(); 

    Flux.from(publisher).subscribe(replyMessage -> {
        replyMessage.logger().info("Reply Message -> {}", replyMessage);
    });

    IntStream.range(0, 10).parallel().forEach(i -> {
        producer.publish(
                String.valueOf(i), 
                Any.newBuilder()
                        .setTypeUrl("io.astreu.custom/Text")
                        .setValue(ByteString.copyFrom(String.format("Hello World Astreu %s", i).getBytes()))
                        .build()
        );
    });
}
public static void main(final String[] args) {
    final Publisher<MessageWithContext> publisher =
            Astreu.at("127.0.0.1", 9980)
                    .asSub("test", "unique-subscription")
                    .receiveOnly(MessageType.EXCHANGE)
                    .bind(); 

    Flux.from(publisher).subscribe(messageWithContext -> {
        final AcknowledgeContext context = messageWithContext.getContext();

        context.logger().debug("Message type is -> {}", messageWithContext.getType());
        final Exchange message = messageWithContext.getMessage();

        context.logger().info("Incoming Message {}", message);
        context.accept();
    });
}

Client SDK's