This code influeneced the Go client that's now maintained by Basho, the creators of Riak. You can contribute to the repository here or install with go get github.com/basho/riak-go-client
.
Go Protocol Buffer driver for the Riak distributed database
go get github.com/riaken/riaken-core
go get http://gopkg.in/riaken/riaken-core.v1
http://godoc.org/gopkg.in/riaken/riaken-core.v1
http://godoc.org/gopkg.in/riaken/riaken-core.v1
There are some modules which wrap/extend/test Riaken located at the following:
The following points are what drive this project.
The client is the all encompassing framework of riaken. From there sessions should be grabbed and released as necessary.
package main
import "log"
import "github.com/riaken/riaken-core"
func main() {
// Riak cluster addresses
addrs := []string{"127.0.0.1:8083", "127.0.0.1:8084", "127.0.0.1:8085", "127.0.0.1:8086", "127.0.0.1:8087"}
// Create a client, passing the addresses, and number of connections to maintain per cluster node
client := riaken_core.NewClient(addrs, 1)
// Dial the servers
if err := client.Dial(); err != nil {
log.Fatal(err.Error()) // all nodes are down
}
// Gracefully close all the connections when finished
defer client.Close()
// Grab a session to interact with the cluster
session := client.Session()
// Release the session
defer session.Release()
}
Simply see if the server is responsive.
if !session.Ping() {
log.Error("no ping response")
}
if ok, err := session.SetClientId([]byte("client1")); !ok {
log.Error("no response")
} else if err != nil {
log.Error(err.Error())
}
res, err := session.GetClientId()
if err != nil {
log.Error(err.Error())
}
log.Print(string(res.GetClientId()))
WARNING: This is not recommended to be used against a production server.
buckets, err := session.ListBuckets()
if err != nil {
log.Error(err.Error())
}
Get useful info about the Riak servers.
info, err := session.ServerInfo()
if err != nil {
log.Error(err.Error())
}
log.Print(string(info.GetNode()))
log.Print(string(info.GetServerVersion()))
Buckets now have a recommended Type() method which allows for another level of namespacing.
bucket := session.GetBucket("bucket").Type("high_level")
The type is set to default
by Riak if not specified.
Not exactly straightforward because they require the use of the RPB structs.
// Make sure to require this
import (
"github.com/riaken/riaken-core/rpb"
"github.com/golang/protobuf/proto"
)
// Code
bucket := session.GetBucket("b2")
props := &rpb.RpbBucketProps{
NVal: proto.Uint32(1),
AllowMult: proto.Bool(true),
}
if ok, err := bucket.SetBucketProps(props); !ok {
log.Error("could not set bucket props")
} else if err != nil {
log.Error(err.Error())
}
out, err := bucket.GetBucketProps()
if err != nil {
log.Error(err.Error())
}
if out.GetProps().GetAllowMult() != true {
log.Errorf("expected: true, got: %t", out.GetProps().GetAllowMult())
}
bucket := session.GetBucket("b2").Type("test_maps")
props := &rpb.RpbBucketProps{
AllowMult: proto.Bool(true),
}
if ok, err := bucket.SetBucketType(props); !ok {
t.Error("could not set bucket props")
} else if err != nil {
t.Error(err.Error())
}
bucket := session.GetBucket("b2").Type("test_maps")
if ok, err := bucket.ResetBucket(); !ok {
t.Error("could not set bucket props")
} else if err != nil {
t.Error(err.Error())
}
WARNING: This is not recommended to be used against a production server.
Note that this is a streaming response and must be called until done. If not streamed the next operation will fail and/or hang.
var keys [][]byte
// Loop until done is received from Riak
for out, err := bucket.ListKeys(); !out.GetDone(); out, err = bucket.ListKeys() {
if err != nil {
log.Error(err.Error())
break
}
keys = append(keys, out.GetKeys()...)
}
log.Print(keys)
Grab a bucket by name, an object with the key to store, and insert the data as []byte
.
bucket := session.GetBucket("b1")
object := bucket.Object("o1")
if _, err := object.Store([]byte("o1-data")); err != nil {
log.Error(err.Error())
}
For server assigned keys.
bucket := session.GetBucket("b1")
object := bucket.Object("") // leave this blank
res, err := object.Store([]byte("o1-data")) // fetch the response
if err != nil {
log.Error(err.Error())
}
log.Print(string(res.GetKey())) // this is the server assigned key
Note that the content comes back as an array. If > 1
this record has siblings.
data, err := object.Fetch()
if err != nil {
log.Error(err.Error())
}
log.Print(string(data.GetContent()[0].GetValue()))
Verbose version.
if ok, err := object.Delete(); !ok {
log.Error("deletion of object failed")
} else if err != nil {
log.Error(err.Error())
}
Simple version.
if _, err := object.Delete(); err != nil {
log.Error(err.Error())
}
bucket := session.GetBucket("b5")
counter := bucket.Counter("c1")
if _, err := counter.Update(1); err != nil {
log.Error(err.Error())
}
data, err := counter.Get()
if err != nil {
t.Error(err.Error())
}
log.Print(data.GetValue())
CRDTs can be queried similar to an Object.
crdt := bucket.Crdt("foo")
if _, err := crdt.Fetch(); err != nil {
t.Fatal(err.Error())
}
They will then have their Counter, Set, or Map value set depending on what was initially stored in Riak.
// crdt.Counter
// crdt.Set
// crdt.Map
They are simply deleted with the Object interface.
object := bucket.Object("foo")
if _, err := object.Delete(); err != nil {
t.Fatal(err.Error())
}
bucket := session.GetBucket("crdt_counter").Type("test_counters")
counter := bucket.Crdt("foo").NewCounter()
counter.Increment(4)
if _, err := counter.Commit(); err != nil {
t.Fatal(err.Error())
}
counter.Decrement(1)
if _, err := counter.Commit(); err != nil {
t.Fatal(err.Error())
}
log.Print(counter.Value) // should equal 3
bucket := session.GetBucket("crdt_set").Type("test_sets")
set := bucket.Crdt("foo").NewSet()
set.Add("bar")
set.Add("baz")
if _, err := set.Commit(); err != nil {
t.Fatal(err.Error())
}
log.Print(set.Values) // should contain ["bar", "baz"]
set.Remove("baz")
set.Add("car")
if _, err := set.Commit(); err != nil {
t.Fatal(err.Error())
}
log.Print(set.Values) // should contain ["bar", "car"]
Maps contain:
They are used in the following way:
bucket := session.GetBucket("crdt_map").Type("test_maps")
crdt := bucket.Crdt("foo")
mp := crdt.NewMap()
// Add Flags
mp.Flags["f1"] = true
mp.Flags["f2"] = false
// Add Registers
mp.Registers["r1"] = "1r"
mp.Registers["r2"] = "2r"
// Add Counters
mp.Counters["c1"] = crdt.NewCounter()
mp.Counters["c1"].Increment(10)
// Add Sets
mp.Sets["s1"] = crdt.NewSet()
mp.Sets["s1"].Add("1")
mp.Sets["s1"].Add("2")
mp.Sets["s1"].Add("3")
mp.Sets["s2"] = crdt.NewSet()
mp.Sets["s2"].Add("a")
mp.Sets["s2"].Add("b")
// Add Maps (within Maps, within...)
mp.Maps["m1"] = crdt.NewMap()
mp.Maps["m1"].Flags["ff1"] = true
mp.Maps["m1"].Registers["rr1"] = "1rr"
mp.Maps["m1"].Counters["cc1"] = crdt.NewCounter()
mp.Maps["m1"].Counters["cc1"].Increment(20)
mp.Maps["m1"].Sets["ss1"] = crdt.NewSet()
mp.Maps["m1"].Sets["ss1"].Add("111")
mp.Maps["m1"].Sets["ss1"].Add("222")
mp.Maps["m1"].Maps["mm1"] = crdt.NewMap()
mp.Maps["m1"].Maps["mm1"].Flags["fff1"] = false
// Save
if _, err := mp.Commit(); err != nil {
t.Fatal(err.Error())
}
Values can be removed from Maps with Remove():
crdt := bucket.Crdt("foo")
if _, err := crdt.Fetch(); err != nil {
t.Fatal(err.Error())
}
crdt.Remove(CRDT_MAP_FLAG, "f2")
if _, err := mp.Commit(); err != nil {
t.Fatal(err.Error())
}
// crdt.Map.Flags["f2"] should no longer exist
Note that this is a streaming response and must be called until done. If not streamed the next operation will fail and/or hang.
request := []byte(`{
"inputs":"training",
"query":[{"map":{"language":"javascript",
"source":"function(riakObject) {
var val = riakObject.values[0].data.match(/pizza/g);
return [[riakObject.key, (val ? val.length : 0 )]];
}"}}]}`)
contentType := []byte("application/json")
// Query
query := session.Query()
var result []byte
// Loop until done is received from Riak.
for out, err := query.MapReduce(request, contentType); !out.GetDone(); out, err = query.MapReduce(request, contentType) {
if err != nil {
log.Error(err.Error())
break
}
result = append(result, out.GetResponse()...)
}
log.Print(data.GetKeys())
Note that storage_backend must be set to riak_kv_eleveldb_backend in app.config to use this.
query := session.Query()
data, err := query.SecondaryIndexes([]byte("b1"), []byte("animal_bin"), []byte("chicken"), nil, nil, 0, nil)
if err != nil {
log.Error(err.Error())
}
Note that riak_search needs to be enabled in the app.config to use this.
query := session.Query()
data, err := query.Search([]byte("b3"), []byte("food:pizza"))
if err != nil {
log.Error(err.Error())
}
log.Print(data.GetNumFound())
Sometimes it is desirable to pass more complex options to the server. All methods capable of receiving additional options have access to Do()
. This method takes in a RPB struct and is chained together with the method one wishes to call.
// Make sure to require this
import (
"github.com/riaken/riaken-core/rpb"
"github.com/golang/protobuf/proto"
)
// Code
bucket := session.GetBucket("b1")
object := bucket.Object("o1")
// Store
opts1 := &rpb.RpbPutReq{
ReturnBody: proto.Bool(true),
}
ret, err := object.Do(opts1).Store([]byte("o1-data"))
if err != nil {
log.Error(err.Error())
}
if string(ret.GetContent()[0].GetValue()) != "o1-data" {
log.Errorf("got %s, expected o1-data", string(ret.GetContent()[0].GetValue()))
}
// Fetch
opts2 := &rpb.RpbGetReq{
Head: proto.Bool(true),
}
data, err := object.Do(opts2).Fetch()
if err != nil {
log.Error(err.Error())
}
if len(data.GetContent()) > 0 {
if string(data.GetContent()[0].GetValue()) != "" {
log.Error("expected empty content")
}
}
// Delete
opts3 := &rpb.RpbDelReq{
Rw: proto.Uint32(1),
}
if ok, err := object.Do(opts3).Delete(); !ok {
log.Error("deletion of object failed")
} else if err != nil {
log.Error(err.Error())
}
See the Riak PBC docs for a more detailed explanation of what all the parameters are for each method.
Brian Jones - mojobojo@gmail.com - https://twitter.com/mojobojo