lirm / aeron-go

Efficient reliable UDP unicast, UDP multicast, and IPC message transport - Go port
Apache License 2.0
183 stars 46 forks source link

Build Status Go Report Card Join the chat at https://gitter.im/aeron-go/Lobby

aeron-go

Implementation of Aeron messaging client in Go.

Architecture, design, and protocol of Aeron can be found here

Usage

Example subscriber can be found here.

Example publication can be found here.

Common

Instantiate Aeron with Context:

ctx := aeron.NewContext().MediaDriverTimeout(time.Second * 10)

a := aeron.Connect(ctx)

Subscribers

Create subscription:

subscription := <-a.AddSubscription("aeron:ipc", 10)

defer subscription.Close()

aeron.AddSubscription() returns a channel, so that the user has the choice of blocking waiting for subscription to register with the driver or do async select poll.

Define callback for message processing:

handler := func(buffer *buffers.Atomic, offset int32, length int32, header *logbuffer.Header) {
    bytes := buffer.GetBytesArray(offset, length)

    fmt.Printf("Received a fragment with payload: %s\n", string(bytes))
}

Poll for messages:

idleStrategy := idlestrategy.Sleeping{time.Millisecond}

for {
    fragmentsRead := subscription.Poll(handler, 10)
    idleStrategy.Idle(fragmentsRead)
}

Publications

Create publication:

publication := <-a.AddPublication("aeron:ipc", 10)

defer publication.Close()

aeron.AddPublication() returns a channel, so that the user has the choice of blocking waiting for publication to register with the driver or do async select poll.

Create Aeron buffer to send the message:

message := fmt.Sprintf("this is a message %d", counter)

srcBuffer := buffers.MakeAtomic(([]byte)(message))

Optionally make sure that there are connected subscriptions:

for !publication.IsConnected() {
    time.Sleep(time.Millisecond * 10)
}

Send the message, by calling publication.Offer

ret := publication.Offer(srcBuffer, 0, int32(len(message)), nil)
switch ret {
case aeron.NotConnected:
    log.Print("not connected yet")
case aeron.BackPressured:
    log.Print("back pressured")
default:
    if ret < 0 {
        log.Print("Unrecognized code: %d", ret)
    } else {
        log.Print("success!")
    }
}