aerospike / aerospike-client-go

Aerospike Client Go
Apache License 2.0
430 stars 199 forks source link

Maps aren't being treated as maps in UDF #63

Closed abramovic closed 9 years ago

abramovic commented 9 years ago

Whenever I store maps/json using the Go client and try to access that bin using UDF I keep getting the type as "userdata" instead of a table or a list.

Message: bad argument #1 to 'format' (string expected, got userdata)

package main

import (
    "fmt"
    as "github.com/aerospike/aerospike-client-go"
    "log"
    "strconv"
    "time"
)

const udfFilter = `
function queryTest(s, rec)
  local time = os.time()
  local function map_value_merger(mapval1, mapval2)
    return mapval1 + mapval2
  end
  local function reduce_results(a,b)
    return map.merge(a,b,map_value_merger)
  end

  function apply_totalmap_cxt(mymap, rec)
        local topics = rec["Data"]
        if topics ~= nil then
            for k in list.iterator(topics) do
                warn(k)
                if mymap[k] == nil then
                    mymap = 0
                end
                mymap[k] = mymap[k] + 1
      end
        end
    return mymap
  end
    return s : aggregate(map(), apply_totalmap_cxt) : reduce(reduce_results)
end

`

func main() {

    client, err := as.NewClient("127.0.0.1", 3000)
    if err != nil {
        log.Fatal(err)
    }

    key, _ := as.NewKey("test", "pets", "example_key")

    topics := []map[interface{}]interface{}{}
    topic1 := map[interface{}]interface{}{}
    topic1["animal"] = "Dogs"
    topic2 := map[interface{}]interface{}{}
    topic2["animal"] = "Cats"

    topics = append(topics, topic1)
    topics = append(topics, topic2)

    data := as.BinMap{
    "CreatedAt": time.Now().Unix(),
    "bin2": "An elephant is a mouse with an operating system",
        "Data": topics,
  }

    client.Put(nil, key, data)

    regTask, err := client.RegisterUDF(nil, []byte(udfFilter), "queryTest.lua", as.LUA)
    if err != nil {
        log.Fatal(err)
    }
    for {
        if err := <-regTask.OnComplete(); err == nil {
            break
        } else {
            log.Println(err)
        }
    }

    stm := as.NewStatement("test", "pets")
    stm.SetAggregateFunction("queryTest", "queryTest", nil, true)

    recordset, err := client.Query(nil, stm)
    if err != nil {
        log.Fatal(err)
    }

    results := map[string]int{}

L:
    for {
        select {
        case rec, _ := <-recordset.Records:
            if rec == nil {
                break L
            }
            if result, ok := rec.Bins["SUCCESS"].(map[interface{}]interface{}); ok {
                for k, v := range result {
                    key := ""
                    if r, ok := k.(int); ok {
                        key = fmt.Sprintf("%d", r)
                    }
                    if r, ok := k.(string); ok {
                        key = r
                    }
                    if r, ok := v.(int); ok {
                        results[key] = results[key] + r
                    }
                }
            }
        }
    }

    for k, v := range results {
        fmt.Println(k + " :: " + strconv.Itoa(v))
    }
}
khaf commented 9 years ago

Thanks for the very helpful and detailed report. I'm investigating this and will come back to you with a workaround or a fix.

abramovic commented 9 years ago

FYI it might not be a Go client problem. I tried parsing the bin "bin" from https://www.aerospike.com/docs/guide/data-types.html#map and still wasn't able to read the map/object in Lua.

If I treat the object like a string instead of a map and then parse it as JSON in Lua then I no longer get a userdata issue.

What I mean is that instead of..

insert into test.demo (PK, bin) values ('key', 'JSON["str1" , "str2" , "str3"]')

I just insert as...

insert into test.demo (PK, bin) values ('key', '["str1" , "str2" , "str3"]')

In the second example I am then able to do something like this without getting a userdata error

local myobject = rec["bin"] 
if myobject ~= nil then 
  JSON = require("JSON") 
  local parsed = JSON:decode(myobject)
end

We're currently using Enterprise 3.4.0 for Ubuntu

The JSON library used to parse the string is from https://github.com/aerospike/complex-data-types

khaf commented 9 years ago

Sorry for a late reply. This is a server particularity.

JSON parsing is slow on both ends. To merge maps, you can iterate over them using map.pairs:

for k, v in map.pairs(a) do
  b[k] = (b[k] or 0) + v
end
abramovic commented 9 years ago

Hi Khosrow

Thanks for the code sample. I'm still getting an error trying to parse the bin:

bad argument #1 to 'pairs' (Map expected, got userdata)

The map.pairs(a) example works if I insert the object in AQL such as this:

insert into test.demo (PK, Index, bin) values ('key', 'json', 'JSON{"str2": "str3"}')

The Go client is storing JSON as an interface (map[interface{}]interface{}{}) and I think that's why Lua is treating the JSON as a usertable because it doesn't know the type (since it's an interface).

https://github.com/aerospike/aerospike-client-go/blob/9b81f1d7455c1b3a1306fb167fe5ae52e3b40294/value.go#L123

abramovic commented 9 years ago

Tested and this works: https://github.com/aerospike/aerospike-client-go/pull/66

khaf commented 9 years ago

Maps are sent to and stored on the server using msgpack format. map[interface{}]interface{} has nothing to do with your problem. As such, there are no bugs in client or server.

Here's an annotated (and improved) version of your code. Don't hesitate to ask if you need more clarification. (Both Lua an Go Code have been changed.)

package main

import (
    "fmt"
    "log"
    "time"

    as "github.com/aerospike/aerospike-client-go"
)

const udfFilter = `
function queryTest(s, rec)
  local time = os.time()
  local function map_value_merger(mapval1, mapval2)
    return mapval1 + mapval2
  end

  local function reduce_results(a,b)
    return map.merge(a,b,map_value_merger)
  end

  function apply_totalmap_cxt(mymap, rec)
        local topics = rec["Data"]
        -- info(tostring(topics)) -- if you want to log something, make sure to convert them to string first
        if topics ~= nil then
            for k in list.iterator(topics) do
                for a, v in map.pairs(k) do -- k is a map, iterate over it to get to animals
                    mymap[v] = (mymap[v] or 0) + 1 -- no need for an if clause. Use 'or' to shortcut nil -> value checks
                end
            end
        end

        info(tostring(mymap))
    return mymap
  end
    return s : aggregate(map(), apply_totalmap_cxt) : reduce(reduce_results)
end
`

func main() {

    client, err := as.NewClient("127.0.0.1", 3000)
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close() // cleanup after your function

    for i := 0; i < 10; i++ {
        key, _ := as.NewKey("test", "pets", i)

        topics := []map[interface{}]interface{}{}
        topic1 := map[interface{}]interface{}{}
        topic1["animal"] = "Dogs"
        topic2 := map[interface{}]interface{}{}
        topic2["animal"] = "Cats"

        topics = append(topics, topic1, topic2)

        data := as.BinMap{
            "CreatedAt": time.Now().Unix(),
            "bin2":      "An elephant is a mouse with an operating system",
            "Data":      topics,
        }

        err = client.Put(nil, key, data)
        if err != nil {
            log.Fatal(err)
        }
    }

    regTask, err := client.RegisterUDF(nil, []byte(udfFilter), "queryTest.lua", as.LUA)
    if err != nil {
        log.Fatal(err)
    }

    if err := <-regTask.OnComplete(); err != nil {
        log.Println(err)
    }

    stm := as.NewStatement("test", "pets")
    stm.SetAggregateFunction("queryTest", "queryTest", nil, true)

    recordset, err := client.Query(nil, stm)
    if err != nil {
        log.Fatal(err)
    }
    defer recordset.Close()

    results := map[string]int{}
    // Range on Results() is the recommended way of getting results back from Query/Scans
    for res := range recordset.Results() {
        if res.Err != nil {
            log.Fatal(res.Err)
        }

        // len gaurd is to avoid processing empty resultsets
        if result, ok := res.Record.Bins["SUCCESS"].(map[interface{}]interface{}); ok && len(result) > 0 {
            for k, v := range result {
                results[k.(string)] += v.(int)
            }
        }
    }

    fmt.Println(results)
}