splay-project / splay

The main Splay repository.
GNU General Public License v3.0
16 stars 3 forks source link

Blocked queries #52

Open vschiavoni opened 8 years ago

vschiavoni commented 8 years ago

There are scenarios where the concurrency level allowed by the Splay runtime is not sufficient. One of them is the execution of the following protocol. It consists of the T-Kad protocol, gossip-based construction of the KAD DHT. The problematic scenario occurs when deployed over a cluster of 600 splayds and using exactly 600 nodes. Each of the nodes issue 500 queries more or less concurrently.

The attached plot gantt.pdf shows that queries (on the y-axis) get slower and slower (longer blue bars on the x-axis).

We might need a simpler test case to identify and possibly optimise the runtime.

-------------------------------------------------------------------------------
-- modules
-------------------------------------------------------------------------------

require"splay.base"
rpc = require"splay.rpc"
bit = require"bit"
rpc.l_o.level=1
misc = require "splay.misc"
crypto = require "crypto"

-- addition to allow local run
PARAMS={}
local cmd_line_args=nil
if not job then --outside the sandbox
    if #arg < 2 then  
        print("lua ", arg[0], " my_position nb_nodes")  
        os.exit()  
    else        
        local pos, total = tonumber(arg[1]), tonumber(arg[2])  
        local utils = require("splay.utils")
        job = utils.generate_job(pos, total, 20001) 
        cmd_line_args=arg[3]    
    end
end

if arg~=nil then
    if cmd_line_args==nil then cmd_line_args=arg[1] end
    if cmd_line_args~=nil and cmd_line_args~="" then
        print("ARGS: ",cmd_line_args)   
        for _,v in pairs(misc.split(cmd_line_args,":")) do
            local t=misc.split(v,"=")
            PARAMS[t[1]]=t[2]
        end
    end
end

rpc.server(job.me.port)

-------------------------------------------------------------------------------
-- current node
-------------------------------------------------------------------------------
-- 31 bit is currently the maximal id space: BitOp library provides operations only in the range of signed 32 bit numbers
bits = 31
function compute_id(o) return string.sub(crypto.evp.new("sha1"):digest(o), 1, bits/ 4) end

me = {}
me.peer = job.me
me.age = 0
me.id = compute_id(job.me.ip..job.me.port)

function num(k)
    if k.id then return tonumber("0x"..k.id) else return tonumber("0x"..k) end
end

-------------------------------------------------------------------------------
-- parameters
-------------------------------------------------------------------------------

max_time = 800

--T-KAD params
TKAD_MESSAGE = tonumber(PARAMS["TKAD_MESSAGE"]) or 10
TKAD_VIEW = tonumber(PARAMS["TKAD_VIEW"]) or 10
GOSSIP_TIME = tonumber(PARAMS["GOSSIP_TIME"]) or 7
K_SIZE = tonumber(PARAMS["K_SIZE"]) or 3
TKAD_CONVERGE = PARAMS["TKAD_CONVERGE"] or false
TKAD_LOOKUP = PARAMS["TKAD_LOOKUP"] or true
PARTNER_SELECT = tonumber(PARAMS["PARTNER_SELECT"]) or 0

--PSS params
PSS_VIEW_SIZE =tonumber(PARAMS["PSS_VIEW_SIZE"]) or 10
PSS_SHUFFLE_SIZE =  tonumber(PARAMS["PSS_SHUFFLE_SIZE"]) or math.floor(PSS_VIEW_SIZE / 2 + 0.5)
PSS_SHUFFLE_PERIOD = tonumber(PARAMS["PSS_SHUFFLE_PERIOD"]) or 10

bytesSent = 0
bytesReceived = 0

-- ############################################################################
--  Peer Sampling Service
-- ############################################################################

PSS = {

    view = {},
    view_copy = {},
    c = PSS_VIEW_SIZE,
    exch = PSS_SHUFFLE_SIZE,
    S = math.floor(PSS_VIEW_SIZE/ 2 + 0.5),
    H = 0,
    SEL = "rand", -- could also be "tail"
    view_copy_lock = events.lock(),

    -- utilities
    print_table = function(t)
        print("[ (size "..#t..")")
        for i=1,#t do
            print("  "..i.." : ".."["..t[i].peer.ip..":"..t[i].peer.port.."] - age: "..t[i].age.." - id: "..t[i].id)
        end
        print("]")
    end,

    set_of_peers_to_string = function(v)
        ret = ""; for i=1,#v do ret = ret..v[i].id.." " end
        return ret
    end,

    print_set_of_peers = function(v,message)    
        if message then log:print(message) end
        log:print(PSS.set_of_peers_to_string(v))
    end,

    print_view = function(message)
        if message then log:print(message) end
        log:print("PSS VIEW_CONTENT "..job.position.." "..PSS.set_of_peers_to_string(PSS.view))
    end,

    -- peer sampling functions

    pss_selectPartner= function()
        if #PSS.view > 0 then
            if PSS.SEL == "rand" then return math.random(#PSS.view) end
            if PSS.SEL == "tail" then
                local ret_ind = -1 ; local ret_age = -1
                for i,p in pairs(PSS.view) do
                    if (p.age > ret_age) then ret_ind = i;ret_age=p.age end
                end
                assert (not (ret_ind == -1))
                return ret_ind
            end
        else
            return false
        end
    end,

    same_peer_but_different_ages = function(a,b)
        return a.peer.ip == b.peer.ip and a.peer.port == b.peer.port
    end,

    same_peer = function(a,b)
        return PSS.same_peer_but_different_ages(a,b) and a.age == b.age
    end,

    pss_selectToSend = function()
        -- create a new return buffer
        local toSend = {}
        -- append the local node view age 0
        table.insert(toSend,{peer={ip=job.me.ip,port=job.me.port},age=0,id=me.id})
        -- shuffle view
        PSS.view = misc.shuffle(PSS.view)
        -- move oldest H items to the end of the view
        --- 1. copy the view
        local tmp_view = misc.dup(PSS.view)
        --- 2. sort the items based on the age
        table.sort(tmp_view,function(a,b) return a.age < b.age end)
        --- 3. get the H largest aged elements from the tmp_view, remove them from the view 
        ---    (we assume there are no duplicates in the view at this point!)
        ---    and put them at the end of the view
        for i=(#tmp_view-PSS.H+1),#tmp_view do
            local ind = -1
            for j=1,#PSS.view do
                if PSS.same_peer(tmp_view[i],PSS.view[j]) then ind=j; break end
            end
            assert (not (ind == -1))
            elem = table.remove(PSS.view,ind)
            PSS.view[#PSS.view+1] = elem
        end

        -- append the first exch-1 elements of view to toSend
        for i=1,(PSS.exch-1) do
            toSend[#toSend+1]=PSS.view[i]
        end     

        return toSend
    end,

    pss_selectToKeep = function(received)
        local selectToKeepStart= misc.time()    
        -- concatenate the view and the received set of view items
        for j=1,#received do PSS.view[#PSS.view+1] = received[j] end

        -- remove duplicates from view
        -- note that we can't rely on sorting the table as we need its order later
        local i = 1 
        while i < #PSS.view-1 do
            for j=i+1,#PSS.view do
                if PSS.same_peer_but_different_ages(PSS.view[i],PSS.view[j]) then
                    -- delete the oldest
                    if PSS.view[i].age < PSS.view[j].age then 
                        table.remove(PSS.view,j)
                    else
                        table.remove(PSS.view,i)
                    end
                    i = i - 1 -- we need to retest for i in case there is one more duplicate
                    break
                end
            end
            i = i + 1
        end

        -- remove the min(H,#view-c) oldest items from view
        local o = math.min(PSS.H,#PSS.view-PSS.c)
        while o > 0 do
            -- brute force -- remove the oldest
            local oldest_index = -1
            local oldest_age = -1
            for i=1,#PSS.view do 
                if oldest_age < PSS.view[i].age then
                    oldest_age = PSS.view[i].age
                    oldest_index = i
                end
            end
            assert (not (oldest_index == -1))
            table.remove(PSS.view,oldest_index)
            o = o - 1
        end

        -- remove the min(S,#view-c) head items from view
        o = math.min(PSS.S,#PSS.view-PSS.c)
        while o > 0 do
            table.remove(PSS.view,1) -- not optimal
            o = o - 1
        end

        -- in the case there still are too many peers in the view, remove at random
        while #PSS.view > PSS.c do table.remove(PSS.view,math.random(#PSS.view)) end

        assert (#PSS.view <= PSS.c)
        --log:print("PSS_SELECT_TO_KEEP ", ( misc.time() - selectToKeepStart ) )        
    end,

    ongoing_at_rpc=false,

    is_init = false,

    pss_passive_thread = function(from,buffer)
        if PSS.ongoing_at_rpc or not PSS.is_init then
            return false
        end

        --PSS.print_view("passive_thread ("..job.position.."): entering")
        --PSS.print_set_of_peers(buffer,"passive_thread ("..job.position.."): received from "..from)
        local ret = PSS.pss_selectToSend()
        PSS.pss_selectToKeep(buffer)
        --PSS.print_view("passive_thread ("..job.position.."): after selectToKeep")
        return ret
    end,

    pss_send_at_rpc = function(peer,pos,buf)
        local ok, r = rpc.acall(peer,{"PSS.pss_passive_thread", pos, buf},PSS_SHUFFLE_PERIOD/2)
        return ok,r
    end,

    pss_active_thread = function()
        PSS.ongoing_at_rpc=true
        -- select a partner
        local exchange_aborted=true
        local exchange_retry=2
        for i=1,exchange_retry do --up to 2 attemps per round, re-do in case of conflict 
            partner_ind = PSS.pss_selectPartner()
            if not partner_ind then
                log:print("pss_active_thread: pss view is empty, no partner can be selected")
                return
            end
            partner = PSS.view[partner_ind]
            -- remove the partner from the view
            table.remove(PSS.view,partner_ind)
            -- select what to send to the partner
            buffer = PSS.pss_selectToSend()
            --PSS.print_set_of_peers(buffer,"active_thread ("..job.position.."): sending to "..partner.id)

            -- send to the partner
            local rpcStart=misc.time()
            local ok, r = PSS.pss_send_at_rpc(partner.peer,job.position, buffer) -- rpc.acall(partner.peer,{"PSS.pss_passive_thread", job.position, buffer},PSS_SHUFFLE_PERIOD/2)
            --log:print("PSS.pss_passive_thread.RPC ",  misc.time() - rpcStart  )

            if ok then
                -- select what to keep etc.
                local received = r[1]
                if received==false then
                    log:print("PSS received false due to ongoing RPC or yet uninitialized view, will try again in a short while")
                    events.sleep(math.random()) 
                    --the call was aborted due to pending RPC at peer's node
                else
                    exchange_aborted=false 
                    --PSS.print_set_of_peers(received,"active_thread ("..job.position.."): received from "..partner.id)
                    PSS.pss_selectToKeep(received)
                    --PSS.print_view("active_thread ("..job.position.."): after selectToKeep")
                end
            else
                -- peer not replying? remove it from view!
                log:print("on peer ("..job.position..") peer "..partner.id.." did not respond -- removing it from the view")
                log:warning("PSS.pss_passive_thread RPC error:", r)
                table.remove(PSS.view,partner_ind)
            end     
            if exchange_aborted==false then break end
        end

        PSS.view_copy_lock:lock()
        local viewCopyLock = misc.time()
        PSS.view_copy = misc.dup(PSS.view)
        --log:print("PSS_VIEW_COPY_LOCK_HELD ", ( misc.time() - viewCopyLock ) )
        PSS.view_copy_lock:unlock()
        for _,v in ipairs(PSS.view) do
                v.age = v.age+1
        end
        -- now, allow to have an incoming passive thread request
        PSS.ongoing_at_rpc=false
    end,

    -- API
    pss_getPeer = function()
        PSS.view_copy_lock:lock()
        local getPeerLockHeldStart = misc.time()

        local peer = PSS.view_copy[math.random(#PSS.view_copy)] 

        --log:print("PSS_GET_PEER_LOCK_HELD_VIEW_COPY ", ( misc.time() - getPeerLockHeldStart ) )
        PSS.view_copy_lock:unlock()

        return peer
    end,

    pss_init = function()
        -- ideally, would perform a random walk on an existing overlay
        -- but here we emerge from the void, so let's use the Splay provided peers.
        -- Ages are taken randomly in [0..c] but could be 0 as well.
        local indexes = {}
        for i=1,#job.nodes do indexes[#indexes+1]=i end
        table.remove(indexes,job.position) --remove myself
        local selected_indexes = misc.random_pick(indexes,math.min(PSS.c,#indexes)) 
        for _,v in ipairs(selected_indexes) do
                local a_peer = job.nodes[v]
                local hashed_index =  compute_id(a_peer.ip..a_peer.port)
                PSS.view[#PSS.view+1] = 
                {peer=a_peer,age=math.random(PSS.c),id=hashed_index}
        end
        PSS.view_copy = misc.dup(PSS.view)
        PSS.is_init = true
        assert (#PSS.view == math.min(PSS.c,#indexes))
        --PSS.print_view("PSS initial view")
    end,

    log_view = function()
        -- called once to log the view
        events.sleep(10.5*PSS_SHUFFLE_PERIOD)
        log:print("VIEW_CONTENT "..job.position.." "..PSS.set_of_peers_to_string(PSS.view))
    end,

}

-- ############################################################################
--  T-KAD
-- ############################################################################

TKAD = {
    view = {},
    routing_table = {},
    v = TKAD_VIEW,
    k = K_SIZE,
    m = TKAD_MESSAGE,
    view_lock = events.lock(),
    rt_lock = events.lock(),
    cycle = 0,
    c_lock = events.lock(),
    ideal_rt = {},
    keys = {},
    responsible = {},
    opt_links = 0,
    mand_links = 0,

-------------------------------------------------------------------------------
-- debug
-------------------------------------------------------------------------------

    display_view = function(v, which)
        local display = table.concat({which,"\n"})
        for i,w in ipairs(v) do
            display = table.concat({display, " ",num(w)})
        end
        log:print(display.."\n")
    end,

    display_rt = function()
        log:print("ROUTING TABLE:", num(me.id))
        for i = 0, bits do
            if TKAD.routing_table[i] and #TKAD.routing_table[i] > 0 then
                local out = ""
                for j,v in ipairs(TKAD.routing_table[i]) do
                    out = table.concat({out,num(v.id)," | "})
                end
                log:print(i, out)
            end
        end
    end,

    debug = function(c)
        log:print("TKAD cycle:", c)
        log:print(TKAD.display_rt())
    end,

-------------------------------------------------------------------------------
-- utilities
-------------------------------------------------------------------------------

    remove_dup = function(set)
        for i,v in ipairs(set) do
            local j = i+1
            while(j <= #set and #set > 0) do
                if v.id == set[j].id then
                    table.remove(set,j)
                else j = j + 1
                end
            end
        end
    end,

    --keep n first elelements from t
    keep_n = function(t,n)
        for i = #t, n+1, -1 do
            table.remove(t,i)
        end
    end,

    same_node = function(n1,n2)
        local peer_first
        if n1.peer then peer_first = n1.peer else peer_first = n1 end
        local peer_second
        if n2.peer then peer_second = n2.peer else peer_second = n2 end
        return peer_first.port == peer_second.port and peer_first.ip == peer_second.ip
    end,

    remove_node  = function(t, node)
        local j = 1
        for i = 1, #t do
            if TKAD.same_node(t[j],node) then table.remove(t, j)
            else j = j+1 end
        end
    end,

    --flatten a two-dimensional array
    flatten = function(t)
        result = {}
        for i,v in ipairs(t) do
            for j,w in ipairs(v) do
                result[#result+1] = w
            end
        end
        return result
    end,

    -- computes the diff between two ids based on the number of bits in which they differ
    xor_diff = function(n,m)
        local xor_result = bit.bxor(n,m)
        local diff = 0
        while xor_result > 0 do
            diff = diff + xor_result%2
            --arithmetic right shift
            xor_result = bit.arshift(xor_result,1)
        end
        return diff
    end,

    --ranks nodes according to the number of differing bits in their IDs;
    --used for selecting TKAD peer and creating TKAD message
    xor_rank = function(set, partner)
        table.sort(set, function (a,b) return TKAD.xor_diff(num(a), num(partner)) < TKAD.xor_diff(num(b), num(partner)) end)
    end,

    xor_rank_pure = function(set, partner)
        table.sort(set, function (a,b) return bit.bxor(num(a), num(partner)) < bit.bxor(num(b), num(partner)) end)
    end,

    already_in = function(t,n)
        for i,v in ipairs(t) do
            if v.id == n.id then return i end
        end
        return -1
    end,

    --computes the number of the k-bucket for the given node
    bucket_num = function(p)
        local n = bit.bxor(num(p), num(me))
        local counter = 0   
        while n ~= 0 do
        --logical rightshift
            n = bit.rshift(n, 1)
            counter = counter+1 
        end
        return bits-counter
    end,

    latency = function(n)
        local latency = rpc.ping(n.peer)
        if latency then return latency end
    end,

    remove_failed_node = function(node)
        TKAD.view_lock:lock()
        TKAD.remove_node(TKAD.view, node)
        TKAD.view_lock:unlock() 
        TKAD.rt_lock:lock()
        TKAD.remove_node(TKAD.routing_table, node)
        TKAD.rt_lock:unlock()

    end,

-------------------------------------------------------------------------------
-- Convergence
-------------------------------------------------------------------------------

    hash_all = function()
        local ids = {}
        for i,v in ipairs(job.nodes) do
            if not TKAD.same_node(v,me) then
                local hashed_index = compute_id(v.ip..v.port)
                ids[#ids+1] = hashed_index
            end
        end
        return ids
    end,

    precompute_routing_table = function()
        local entries = {}
        if TKAD_CONVERGE then 
            for i = 0, bits do TKAD.ideal_rt[i] = {} end
            local ids = TKAD.hash_all()
            for i,v in ipairs(ids) do
                local buck = TKAD.bucket_num(v)
                if entries[buck] then entries[buck] = entries[buck]+1
                else entries[buck] = 1 end
                --print(v.id, TKAD.bit_out(v), buck) 
                table.insert(TKAD.ideal_rt[buck], v)        
            end
            for i,v in pairs(entries) do
                if v == 1 then TKAD.mand_links = TKAD.mand_links+1
                else
                    if v > TKAD.k then
                        TKAD.mand_links = TKAD.mand_links+1
                        TKAD.opt_links = TKAD.opt_links+TKAD.k-1
                    else
                        TKAD.mand_links = TKAD.mand_links+1
                        TKAD.opt_links = TKAD.opt_links+v-1
                    end
                end
        end
        --TKAD.display_ideal_rt()
        log:print("COMPLETE_VIEW_STATE "..me.id.." mandatory ".. TKAD.mand_links.." optional "..TKAD.opt_links)
        end
    end,

    display_ideal_rt = function()
        log:print("IDEAL ROUTING TABLE ", num(me.id))
        for i = 0, bits do
            if TKAD.ideal_rt[i] and #TKAD.ideal_rt[i] > 0 then
                local out = ""
                for j,v in ipairs(TKAD.ideal_rt[i]) do
                    out = out..num(v).." | "
                end
                log:print(i, out)
            end
        end
    end,

-- Every node must know at least one other node in each of its non-empty subtrees
    check_convergence = function(c,thread)
        if TKAD_CONVERGE then
            local opt_entries, mand_entries = 0,0
            for i = 0, #TKAD.routing_table do
                if TKAD.routing_table[i] and #TKAD.routing_table[i]>0 then
                    mand_entries = mand_entries+1
                    if #TKAD.routing_table[i]>1 then
                        opt_entries = opt_entries+#TKAD.routing_table[i]-1
                    end
                end
            end
            log:print(table.concat({thread," TMAN_cycle ",c," CURRENT_VIEW_STATE ", me.id, " mandatory ",mand_entries/(TKAD.mand_links/100), " optional ",opt_entries/(TKAD.opt_links/100)," bytes_sent ", bytesSent," bytes_received ", bytesReceived}))
            log:print("opt_entries", opt_entries, "mand_entries", mand_entries)
        end
    end,

-------------------------------------------------------------------------------
-- Lookup and routing efficiency
-- alfa = 1 (number of find_node RPCs sent asynchronously)
-------------------------------------------------------------------------------
--return the index of the first node in the results that has not been queried yet, if no such node found returns -1
    already_sent = function(results, sent_set)
        for i,v in ipairs(results) do
            local sent = false
            for _, w in ipairs(sent_set) do
                if v.peer == w.peer then sent = true break end
            end
            if not sent then return i end
        end
        return -1
    end,

--updates the results list by merging it with set and keeping k closest nodes
    update_results = function(results, set, key)
        local updated = misc.merge(results, set)
        TKAD.remove_dup(updated)
        TKAD.xor_rank_pure(updated, key)
        TKAD.keep_n(updated, TKAD.k)
        return updated
    end,

--selects k closest nodes
    k_closest_bucket_nodes = function(key)
        local results, buck = {}, TKAD.bucket_num(key)
        for i = #TKAD.routing_table, buck, -1 do
            if #TKAD.routing_table[i] > 0 then results = misc.merge(results, TKAD.routing_table[i]) end
        end
        if #results == TKAD.k then return results
        elseif #results > TKAD.k then local set = {} results = TKAD.update_results(results, set, key) return results end
        buck = buck-1   
        while buck >= 0 do
            if #TKAD.routing_table[buck] > 0 then results = misc.merge(results, TKAD.routing_table[buck]) end
            buck = buck-1
            if #results == TKAD.k then return results
            elseif #results > TKAD.k then local set = {} results = TKAD.update_results(results, set, key) return results end        
        end
        return results
    end,

    callback =function(signal, results)
        events.fire(signal,results)
    end,

    find_node = function(signal, me, key)
        events.thread(function()
        local closest = TKAD.k_closest_bucket_nodes(key)
        rpc.call(me.peer, {'TKAD.callback', signal, closest})
        end)
    end,

    find = function(key)
        --the first nodes to contact are k nodes from the closest k-bucket
        results = TKAD.k_closest_bucket_nodes(key)
        local hops = 0
        local start_time = tostring(misc.time())
        local sent = {}
        while true do
            local index = TKAD.already_sent(results, sent)
            if  index == -1 then return results, hops, start_time 
            else 
                local current = results[index]
                --display_view(results, "results before update from: "..num(current).."("..bit_out(current.id).."):")
                local signal = current.id..key
                sent[#sent+1] = current
                rpc.call(current.peer, {'TKAD.find_node', signal, me, key})
                local p_results = events.wait(signal)
                --display_view(p_results, "p_results from node: "..num(current).."("..bit_out(current.id).."):")
                results = TKAD.update_results(results, p_results, key)
                --display_view(results, "results after update from: "..num(current).."("..bit_out(current.id).."):")
                hops = hops + 1
            end
        end
    end,

    lookup = function(key)
        log:print("LOOKUP_START ".. num(key).."("..key ..")")
        local nearest_k, hops, start_time = TKAD.find(key)
        local elapsed = misc.time()-start_time
        log:print("LOOKUP_END")
        local correct = TKAD.check_correctness(key, nearest_k)
        local res=nil
        if correct then res = "CORRECT_FOUND " else res = "CORRECT_NOT_FOUND " end
        local msg = res.."LOOKUP KEY "..num(key).." HOPS "..hops.." DELAY "..elapsed.." RESPONSIBLE: "
        for i,v in ipairs(nearest_k) do
            msg = msg..table.concat({" ",num(v)})
        end
        log:print(msg)
    end,

-------------------------------------------------------------------------------
-- Routing correctness
-------------------------------------------------------------------------------

    precompute_resp_nodes = function()
        local ids = TKAD.hash_all()
        ids[#ids+1] = me.id
        for i,k in ipairs(TKAD.keys) do
            TKAD.xor_rank_pure(ids, k)
            TKAD.responsible[k] = {}
            --local out="resp for "..k.." with bucket_num "..TKAD.bucket_num(k)..": " 
            for i=1, TKAD.k do
                table.insert(TKAD.responsible[k],ids[i])
--              out=out.." "..num(ids[i])
            end
--          log:print(out)
        end
    end,

    check_correctness = function(key,closest)
        for i,v in ipairs(closest) do
            --log:print("closest", i, num(v))
            local match = false
            for j,w in ipairs(TKAD.responsible[key]) do
                if num(v) == num(w) then match = true end
            end
            if not match then return false end
        end
        return true
    end,

-------------------------------------------------------------------------------
-- TMAN
-------------------------------------------------------------------------------

    init = function()
        for i = 0, bits - 1 do TKAD.routing_table[i] = {} end
        TKAD.view = misc.random_pick(PSS.view, TKAD.v)
        TKAD.check_convergence(TKAD.cycle,"TKAD.init")
    end,

    -- ranks view according to xor_diff from self and selects a random node from the first m nodes
    select_peer = function()
        return misc.random_pick(TKAD.view)
    end,

    -- select peer from pss view, different from self
    select_peer_pss = function()
        PSS.view_copy_lock:lock()
        local i 
        repeat i = math.random(#PSS.view_copy)
        until not TKAD.same_node(PSS.view_copy[i], me)
        local partner = PSS.view_copy[i]
        PSS.view_copy_lock:unlock()
        return partner
    end,

    create_message = function(partner)
        TKAD.view_lock:lock()
        local buffer = misc.dup(TKAD.view)
        TKAD.view_lock:unlock()
        TKAD.rt_lock:lock()
        buffer = misc.merge(buffer,PSS.view, TKAD.flatten(TKAD.routing_table))
        TKAD.rt_lock:unlock()
        buffer[#buffer+1] = me
        TKAD.remove_dup(buffer)
        TKAD.remove_node(buffer, partner)
        return buffer
    end,

    --merges TKAD view with the received message
    update_view = function(received)
        TKAD.view_lock:lock()
        TKAD.view = misc.merge(TKAD.view, received)
        TKAD.remove_dup(TKAD.view)
        TKAD.xor_rank_pure(TKAD.view,me)
        --TKAD.xor_rank(TKAD.view,me)
        TKAD.keep_n(TKAD.view, TKAD.v)
        TKAD.view_lock:unlock()
    end,

    -- add nodes from the received to the corresponding k-buckets
    update_prefix_table = function(received)
        TKAD.rt_lock:lock()
        for i,v in ipairs(received) do
            local buck = TKAD.bucket_num(v)
            if not TKAD.routing_table[buck] then
                TKAD.routing_table[buck] = {}
                table.insert(TKAD.routing_table[buck], v)
            else
                if #TKAD.routing_table[buck] < TKAD.k then --bucket is not full
                --check if this element is already in the bucket: yes - move it to the tail, no - add it to the tail
                    local index = TKAD.already_in(TKAD.routing_table[buck], v)
                    if index > 0 then 
                    table.insert(TKAD.routing_table[buck], table.remove(TKAD.routing_table[buck], index))
                    else table.insert(TKAD.routing_table[buck], v) end          
                else --bucket full: ping the element at the head, if there is response - move to the tail; no response - evict and push the new element to the tail
                    if TKAD.latency(TKAD.routing_table[buck][1]) then 
                        table.insert(TKAD.routing_table[buck], table.remove(TKAD.routing_table[buck], 1))
                    else
                        table.remove(TKAD.routing_table[buck], 1)
                        table.insert(TKAD.routing_table[buck], v)
                    end
                end
            end
        end
        TKAD.rt_lock:unlock()
    end,

    passive_thread = function(received,sender)
        local buffer = TKAD.create_message(sender)
        TKAD.update_view(received)
        TKAD.update_prefix_table(received)
        return buffer
    end,

    select_partner = function(partner_select_code, loc_cycle)
        if partner_select_code == 0 then --alternating
            if loc_cycle <= 2 then return TKAD.select_peer_pss()
            else 
                if loc_cycle%2==0 then return TKAD.select_peer_pss() else return TKAD.select_peer() end
            end
        elseif partner_select_code == 1 then --only pss
            return TKAD.select_peer_pss()
        elseif partner_select_code == 2 then --only view
            return TKAD.select_peer()
        end 
    end,

    active_thread = function()
        TKAD.c_lock:lock()
        local loc_cycle = TKAD.cycle + 1
        TKAD.cycle = TKAD.cycle + 1
        TKAD.c_lock:unlock()        
        local partner = TKAD.select_partner(PARTNER_SELECT,loc_cycle)
        local buffer = TKAD.create_message(partner)
        local try = 0
        local ok, res = rpc.acall(partner.peer, {'TKAD.passive_thread', buffer, me})
        while not ok do
            try = try + 1
            if try <= 3 then
                log:print("TKAD active thread: no response from:"..partner.id.. ": "..tostring(res).." => try again")
                events.sleep(math.random(try * 30, try * 60))
                ok, res = rpc.acall(partner.peer, {'TKAD.passive_thread', buffer, me})
            else
                log:print("TKAD active thread: no response from:"..partner.id..": "..tostring(res).."  => removing it from view")
                TKAD.remove_failed_node(partner)
                break
            end
        end
        if ok then
            local received = res[1]
            TKAD.update_view(received)
            TKAD.update_prefix_table(received)
            --resource_stats()
            TKAD.check_convergence(loc_cycle,"TKAD.active_thread")
            --TKAD.debug(loc_cycle)
        end
    end,    
    }

-------------------------------------------------------------------------------
-- Main loop
-------------------------------------------------------------------------------

function resource_stats()
    --log:print("MEMORY_USED_Kb ", gcinfo())    
    local ts,tr = socket.stats()
    local tot_KB_sent=misc.bitcalc(ts).kilobytes
    local tot_KB_recv=misc.bitcalc(tr).kilobytes
    --log:print("BANDWIDTH_TOTAL ",tot_KB_sent, tot_KB_recv)
    --log:print("BANDWIDTH_RATE  ", (tot_KB_sent - bytesSent )/STATS_PERIOD, (tot_KB_recv - bytesReceived) /STATS_PERIOD)
    bytesSent = tot_KB_sent
    bytesReceived = tot_KB_recv
end

function terminator()
  events.sleep(max_time)
  os.exit()
end

function main()
-- this thread will be in charge of killing the node after max_time seconds
    events.thread(terminator)

    log:print("UP: "..job.me.ip..":"..job.me.port.."id "..me.id)

-- init random number generator
    math.randomseed(job.position*os.time())

-- wait for all nodes to start up (conservative)
  events.sleep(5)

-- desynchronize the nodes
    local desync_wait = (GOSSIP_TIME * math.random())
  log:print("waiting for "..desync_wait.." to desynchronize")
    events.sleep(desync_wait)

    PSS.pss_init()
    PSS_thread = events.periodic(PSS_SHUFFLE_PERIOD, PSS.pss_active_thread)
    events.sleep(60)

    TKAD.precompute_routing_table()
    TKAD.init() 
    events.sleep(20)

    log:print("VIEW_CONSTRUCTION_START_TIME", misc.time())
    TKAD_thread = events.periodic(GOSSIP_TIME, TKAD.active_thread)

    events.sleep(200)

    events.kill(TKAD_thread)
    events.kill(PSS_thread)

    if TKAD_LOOKUP then
        --generate keys
        for i = 1, 50 do
            TKAD.keys[#TKAD.keys+1] = compute_id(math.random())
        end

        --precompute responsible nodes
        TKAD.precompute_resp_nodes()        
        events.sleep(20)

        --start lookups
        for i,v in ipairs(TKAD.keys) do
            events.thread(function() TKAD.lookup(v) end)
            events.sleep(1)
        end   
    end
end

events.thread(main)
events.loop()
etriviere commented 8 years ago

Hi

How can you be sure this is a problem with the concurrency? A simple way to check this could be with random chains of RPCs between nodes in a network, to see if there are more delays than what is expected from serving the currently processed events/ messages.

Or do you mean we should have a multi-threaded support for splay nodes? This would make the complexity of development for each node much larger, and I am not sure this is so much of a benefit.

best Etienne

On 13 Nov 2015, at 16:56, Valerio Schiavoni notifications@github.com wrote:

There are scenarios where the concurrency level allowed by the Splay runtime is not sufficient. One of them is the execution of the following protocol. It consists of the T-Kad protocol, gossip-based construction of the KAD DHT. The problematic scenario occurs when deployed over a cluster of 600 splayds and using exactly 600 nodes. Each of the nodes issue 500 queries more or less concurrently.

The attached plot gantt.pdfhttps://github.com/splay-project/splay/files/34101/gantt.pdf shows that queries (on the y-axis) get slower and slower (longer blue bars on the x-axis).

We might need a simpler test case to identify and possibly optimise the runtime.


-- modules

require"splay.base" rpc = require"splay.rpc" bit = require"bit" rpc.l_o.level=1 misc = require "splay.misc" crypto = require "crypto"

-- addition to allow local run PARAMS={} local cmd_line_args=nil if not job then --outside the sandbox if #arg < 2 then print("lua ", arg[0], " my_position nb_nodes") os.exit() else local pos, total = tonumber(arg[1]), tonumber(arg[2]) local utils = require("splay.utils") job = utils.generate_job(pos, total, 20001) cmd_line_args=arg[3] end end

if arg~=nil then if cmd_line_args==nil then cmd_line_args=arg[1] end if cmd_line_args~=nil and cmd_line_args~="" then print("ARGS: ",cmd_lineargs) for ,v in pairs(misc.split(cmd_line_args,":")) do local t=misc.split(v,"=") PARAMS[t[1]]=t[2] end end end

rpc.server(job.me.port)


-- current node

-- 31 bit is currently the maximal id space: BitOp library provides operations only in the range of signed 32 bit numbers bits = 31 function compute_id(o) return string.sub(crypto.evp.new("sha1"):digest(o), 1, bits/ 4) end

me = {} me.peer = job.me me.age = 0 me.id = compute_id(job.me.ip..job.me.port)

function num(k) if k.id then return tonumber("0x"..k.id) else return tonumber("0x"..k) end end


-- parameters

max_time = 800

--T-KAD params TKAD_MESSAGE = tonumber(PARAMS["TKAD_MESSAGE"]) or 10 TKAD_VIEW = tonumber(PARAMS["TKAD_VIEW"]) or 10 GOSSIP_TIME = tonumber(PARAMS["GOSSIP_TIME"]) or 7 K_SIZE = tonumber(PARAMS["K_SIZE"]) or 3 TKAD_CONVERGE = PARAMS["TKAD_CONVERGE"] or false TKAD_LOOKUP = PARAMS["TKAD_LOOKUP"] or true PARTNER_SELECT = tonumber(PARAMS["PARTNER_SELECT"]) or 0

--PSS params PSS_VIEW_SIZE =tonumber(PARAMS["PSS_VIEW_SIZE"]) or 10 PSS_SHUFFLE_SIZE = tonumber(PARAMS["PSS_SHUFFLE_SIZE"]) or math.floor(PSS_VIEW_SIZE / 2 + 0.5) PSS_SHUFFLE_PERIOD = tonumber(PARAMS["PSS_SHUFFLE_PERIOD"]) or 10

bytesSent = 0 bytesReceived = 0

-- ############################################################################ -- Peer Sampling Service -- ############################################################################

PSS = {

view = {}, view_copy = {}, c = PSS_VIEW_SIZE, exch = PSS_SHUFFLE_SIZE, S = math.floor(PSS_VIEW_SIZE/ 2 + 0.5), H = 0, SEL = "rand", -- could also be "tail" view_copy_lock = events.lock(),

-- utilities print_table = function(t) print("[ (size "..#t..")") for i=1,#t do print(" "..i.." : ".."["..t[i].peer.ip..":"..t[i].peer.port.."] - age: "..t[i].age.." - id: "..t[i].id) end print("]") end,

set_of_peers_to_string = function(v) ret = ""; for i=1,#v do ret = ret..v[i].id.." " end return ret end,

print_set_of_peers = function(v,message) if message then log:print(message) end log:print(PSS.set_of_peers_to_string(v)) end,

print_view = function(message) if message then log:print(message) end log:print("PSS VIEW_CONTENT "..job.position.." "..PSS.set_of_peers_to_string(PSS.view)) end,

-- peer sampling functions

pss_selectPartner= function() if #PSS.view > 0 then if PSS.SEL == "rand" then return math.random(#PSS.view) end if PSS.SEL == "tail" then local ret_ind = -1 ; local ret_age = -1 for i,p in pairs(PSS.view) do if (p.age > ret_age) then ret_ind = i;ret_age=p.age end end assert (not (ret_ind == -1)) return ret_ind end else return false end end,

same_peer_but_different_ages = function(a,b) return a.peer.ip == b.peer.ip and a.peer.port == b.peer.port end,

same_peer = function(a,b) return PSS.same_peer_but_different_ages(a,b) and a.age == b.age end,

pss_selectToSend = function() -- create a new return buffer local toSend = {} -- append the local node view age 0 table.insert(toSend,{peer={ip=job.me.ip,port=job.me.port},age=0,id=me.id}) -- shuffle view PSS.view = misc.shuffle(PSS.view) -- move oldest H items to the end of the view --- 1. copy the view local tmp_view = misc.dup(PSS.view) --- 2. sort the items based on the age table.sort(tmp_view,function(a,b) return a.age < b.age end) --- 3. get the H largest aged elements from the tmp_view, remove them from the view --- (we assume there are no duplicates in the view at this point!) --- and put them at the end of the view for i=(#tmp_view-PSS.H+1),#tmp_view do local ind = -1 for j=1,#PSS.view do if PSS.same_peer(tmp_view[i],PSS.view[j]) then ind=j; break end end assert (not (ind == -1)) elem = table.remove(PSS.view,ind) PSS.view[#PSS.view+1] = elem end

   -- append the first exch-1 elements of view to toSend
   for i=1,(PSS.exch-1) do
       toSend[#toSend+1]=PSS.view[i]
   end

   return toSend

end,

pss_selectToKeep = function(received) local selectToKeepStart= misc.time() -- concatenate the view and the received set of view items for j=1,#received do PSS.view[#PSS.view+1] = received[j] end

   -- remove duplicates from view
   -- note that we can't rely on sorting the table as we need its order later
   local i = 1
   while i < #PSS.view-1 do
       for j=i+1,#PSS.view do
           if PSS.same_peer_but_different_ages(PSS.view[i],PSS.view[j]) then
               -- delete the oldest
               if PSS.view[i].age < PSS.view[j].age then
                   table.remove(PSS.view,j)
               else
                   table.remove(PSS.view,i)
               end
               i = i - 1 -- we need to retest for i in case there is one more duplicate
               break
           end
       end
       i = i + 1
   end

   -- remove the min(H,#view-c) oldest items from view
   local o = math.min(PSS.H,#PSS.view-PSS.c)
   while o > 0 do
       -- brute force -- remove the oldest
       local oldest_index = -1
       local oldest_age = -1
       for i=1,#PSS.view do
           if oldest_age < PSS.view[i].age then
               oldest_age = PSS.view[i].age
               oldest_index = i
           end
       end
       assert (not (oldest_index == -1))
       table.remove(PSS.view,oldest_index)
       o = o - 1
   end

   -- remove the min(S,#view-c) head items from view
   o = math.min(PSS.S,#PSS.view-PSS.c)
   while o > 0 do
       table.remove(PSS.view,1) -- not optimal
       o = o - 1
   end

   -- in the case there still are too many peers in the view, remove at random
   while #PSS.view > PSS.c do table.remove(PSS.view,math.random(#PSS.view)) end

   assert (#PSS.view <= PSS.c)
   --log:print("PSS_SELECT_TO_KEEP ", ( misc.time() - selectToKeepStart ) )

end,

ongoing_at_rpc=false,

is_init = false,

pss_passive_thread = function(from,buffer) if PSS.ongoing_at_rpc or not PSS.is_init then return false end

   --PSS.print_view("passive_thread ("..job.position.."): entering")
   --PSS.print_set_of_peers(buffer,"passive_thread ("..job.position.."): received from "..from)
   local ret = PSS.pss_selectToSend()
   PSS.pss_selectToKeep(buffer)
   --PSS.print_view("passive_thread ("..job.position.."): after selectToKeep")
   return ret

end,

pss_send_at_rpc = function(peer,pos,buf) local ok, r = rpc.acall(peer,{"PSS.pss_passive_thread", pos, buf},PSS_SHUFFLE_PERIOD/2) return ok,r end,

pss_active_thread = function() PSS.ongoing_at_rpc=true -- select a partner local exchange_aborted=true local exchange_retry=2 for i=1,exchange_retry do --up to 2 attemps per round, re-do in case of conflict partner_ind = PSS.pss_selectPartner() if not partner_ind then log:print("pss_active_thread: pss view is empty, no partner can be selected") return end partner = PSS.view[partner_ind] -- remove the partner from the view table.remove(PSS.view,partner_ind) -- select what to send to the partner buffer = PSS.pss_selectToSend() --PSS.print_set_of_peers(buffer,"active_thread ("..job.position.."): sending to "..partner.id)

       -- send to the partner
       local rpcStart=misc.time()
       local ok, r = PSS.pss_send_at_rpc(partner.peer,job.position, buffer) -- rpc.acall(partner.peer,{"PSS.pss_passive_thread", job.position, buffer},PSS_SHUFFLE_PERIOD/2)
       --log:print("PSS.pss_passive_thread.RPC ",  misc.time() - rpcStart  )

       if ok then
           -- select what to keep etc.
           local received = r[1]
           if received==false then
               log:print("PSS received false due to ongoing RPC or yet uninitialized view, will try again in a short while")
               events.sleep(math.random())
               --the call was aborted due to pending RPC at peer's node
           else
               exchange_aborted=false
               --PSS.print_set_of_peers(received,"active_thread ("..job.position.."): received from "..partner.id)
               PSS.pss_selectToKeep(received)
               --PSS.print_view("active_thread ("..job.position.."): after selectToKeep")
           end
       else
           -- peer not replying? remove it from view!
           log:print("on peer ("..job.position..") peer "..partner.id.." did not respond -- removing it from the view")
           log:warning("PSS.pss_passive_thread RPC error:", r)
           table.remove(PSS.view,partner_ind)
       end
       if exchange_aborted==false then break end
   end

   PSS.view_copy_lock:lock()
   local viewCopyLock = misc.time()
   PSS.view_copy = misc.dup(PSS.view)
   --log:print("PSS_VIEW_COPY_LOCK_HELD ", ( misc.time() - viewCopyLock ) )
   PSS.view_copy_lock:unlock()
   for _,v in ipairs(PSS.view) do
           v.age = v.age+1
   end
   -- now, allow to have an incoming passive thread request
   PSS.ongoing_at_rpc=false

end,

-- API pss_getPeer = function() PSS.view_copy_lock:lock() local getPeerLockHeldStart = misc.time()

   local peer = PSS.view_copy[math.random(#PSS.view_copy)]

   --log:print("PSS_GET_PEER_LOCK_HELD_VIEW_COPY ", ( misc.time() - getPeerLockHeldStart ) )
   PSS.view_copy_lock:unlock()

   return peer

end,

pss_init = function() -- ideally, would perform a random walk on an existing overlay -- but here we emerge from the void, so let's use the Splay provided peers. -- Ages are taken randomly in [0..c] but could be 0 as well. local indexes = {} for i=1,#job.nodes do indexes[#indexes+1]=i end table.remove(indexes,job.position) --remove myself local selected_indexes = misc.randompick(indexes,math.min(PSS.c,#indexes)) for ,v in ipairs(selected_indexes) do local a_peer = job.nodes[v] local hashed_index = compute_id(a_peer.ip..a_peer.port) PSS.view[#PSS.view+1] = {peer=a_peer,age=math.random(PSS.c),id=hashed_index} end PSS.view_copy = misc.dup(PSS.view) PSS.is_init = true assert (#PSS.view == math.min(PSS.c,#indexes)) --PSS.print_view("PSS initial view") end,

log_view = function() -- called once to log the view events.sleep(10.5*PSS_SHUFFLE_PERIOD) log:print("VIEW_CONTENT "..job.position.." "..PSS.set_of_peers_to_string(PSS.view)) end,

}

-- ############################################################################ -- T-KAD -- ############################################################################

TKAD = { view = {}, routing_table = {}, v = TKAD_VIEW, k = K_SIZE, m = TKAD_MESSAGE, view_lock = events.lock(), rt_lock = events.lock(), cycle = 0, c_lock = events.lock(), ideal_rt = {}, keys = {}, responsible = {}, opt_links = 0, mand_links = 0,


-- debug

display_view = function(v, which) local display = table.concat({which,"\n"}) for i,w in ipairs(v) do display = table.concat({display, " ",num(w)}) end log:print(display.."\n") end,

display_rt = function() log:print("ROUTING TABLE:", num(me.id)) for i = 0, bits do if TKAD.routing_table[i] and #TKAD.routing_table[i] > 0 then local out = "" for j,v in ipairs(TKAD.routing_table[i]) do out = table.concat({out,num(v.id)," | "}) end log:print(i, out) end end end,

debug = function(c) log:print("TKAD cycle:", c) log:print(TKAD.display_rt()) end,


-- utilities

remove_dup = function(set) for i,v in ipairs(set) do local j = i+1 while(j <= #set and #set > 0) do if v.id == set[j].id then table.remove(set,j) else j = j + 1 end end end end,

--keep n first elelements from t keep_n = function(t,n) for i = #t, n+1, -1 do table.remove(t,i) end end,

same_node = function(n1,n2) local peer_first if n1.peer then peer_first = n1.peer else peer_first = n1 end local peer_second if n2.peer then peer_second = n2.peer else peer_second = n2 end return peer_first.port == peer_second.port and peer_first.ip == peer_second.ip end,

remove_node = function(t, node) local j = 1 for i = 1, #t do if TKAD.same_node(t[j],node) then table.remove(t, j) else j = j+1 end end end,

--flatten a two-dimensional array flatten = function(t) result = {} for i,v in ipairs(t) do for j,w in ipairs(v) do result[#result+1] = w end end return result end,

-- computes the diff between two ids based on the number of bits in which they differ xor_diff = function(n,m) local xor_result = bit.bxor(n,m) local diff = 0 while xor_result > 0 do diff = diff + xor_result%2 --arithmetic right shift xor_result = bit.arshift(xor_result,1) end return diff end,

--ranks nodes according to the number of differing bits in their IDs; --used for selecting TKAD peer and creating TKAD message xor_rank = function(set, partner) table.sort(set, function (a,b) return TKAD.xor_diff(num(a), num(partner)) < TKAD.xor_diff(num(b), num(partner)) end) end,

xor_rank_pure = function(set, partner) table.sort(set, function (a,b) return bit.bxor(num(a), num(partner)) < bit.bxor(num(b), num(partner)) end) end,

already_in = function(t,n) for i,v in ipairs(t) do if v.id == n.id then return i end end return -1 end,

--computes the number of the k-bucket for the given node bucket_num = function(p) local n = bit.bxor(num(p), num(me)) local counter = 0 while n ~= 0 do --logical rightshift n = bit.rshift(n, 1) counter = counter+1 end return bits-counter end,

latency = function(n) local latency = rpc.ping(n.peer) if latency then return latency end end,

remove_failed_node = function(node) TKAD.view_lock:lock() TKAD.remove_node(TKAD.view, node) TKAD.view_lock:unlock() TKAD.rt_lock:lock() TKAD.remove_node(TKAD.routing_table, node) TKAD.rt_lock:unlock()

end,


-- Convergence

hash_all = function() local ids = {} for i,v in ipairs(job.nodes) do if not TKAD.same_node(v,me) then local hashed_index = compute_id(v.ip..v.port) ids[#ids+1] = hashed_index end end return ids end,

precompute_routing_table = function() local entries = {} if TKAD_CONVERGE then for i = 0, bits do TKAD.ideal_rt[i] = {} end local ids = TKAD.hash_all() for i,v in ipairs(ids) do local buck = TKAD.bucket_num(v) if entries[buck] then entries[buck] = entries[buck]+1 else entries[buck] = 1 end --print(v.id, TKAD.bit_out(v), buck) table.insert(TKAD.ideal_rt[buck], v) end for i,v in pairs(entries) do if v == 1 then TKAD.mand_links = TKAD.mand_links+1 else if v > TKAD.k then TKAD.mand_links = TKAD.mand_links+1 TKAD.opt_links = TKAD.opt_links+TKAD.k-1 else TKAD.mand_links = TKAD.mand_links+1 TKAD.opt_links = TKAD.opt_links+v-1 end end end --TKAD.display_ideal_rt() log:print("COMPLETE_VIEW_STATE "..me.id.." mandatory ".. TKAD.mand_links.." optional "..TKAD.opt_links) end end,

display_ideal_rt = function() log:print("IDEAL ROUTING TABLE ", num(me.id)) for i = 0, bits do if TKAD.ideal_rt[i] and #TKAD.ideal_rt[i] > 0 then local out = "" for j,v in ipairs(TKAD.ideal_rt[i]) do out = out..num(v).." | " end log:print(i, out) end end end,

-- Every node must know at least one other node in each of its non-empty subtrees check_convergence = function(c,thread) if TKAD_CONVERGE then local opt_entries, mand_entries = 0,0 for i = 0, #TKAD.routing_table do if TKAD.routing_table[i] and #TKAD.routing_table[i]>0 then mand_entries = mand_entries+1 if #TKAD.routing_table[i]>1 then opt_entries = opt_entries+#TKAD.routing_table[i]-1 end end end log:print(table.concat({thread," TMAN_cycle ",c," CURRENT_VIEW_STATE ", me.id, " mandatory ",mand_entries/(TKAD.mand_links/100), " optional ",opt_entries/(TKAD.opt_links/100)," bytes_sent ", bytesSent," bytes_received ", bytesReceived})) log:print("opt_entries", opt_entries, "mand_entries", mand_entries) end end,


-- Lookup and routing efficiency

-- alfa = 1 (number of find_node RPCs sent asynchronously)

--return the index of the first node in the results that has not been queried yet, if no such node found returns -1 already_sent = function(results, sentset) for i,v in ipairs(results) do local sent = false for , w in ipairs(sent_set) do if v.peer == w.peer then sent = true break end end if not sent then return i end end return -1 end,

--updates the results list by merging it with set and keeping k closest nodes update_results = function(results, set, key) local updated = misc.merge(results, set) TKAD.remove_dup(updated) TKAD.xor_rank_pure(updated, key) TKAD.keep_n(updated, TKAD.k) return updated end,

--selects k closest nodes k_closest_bucket_nodes = function(key) local results, buck = {}, TKAD.bucket_num(key) for i = #TKAD.routing_table, buck, -1 do if #TKAD.routing_table[i] > 0 then results = misc.merge(results, TKAD.routing_table[i]) end end if #results == TKAD.k then return results elseif #results > TKAD.k then local set = {} results = TKAD.update_results(results, set, key) return results end buck = buck-1 while buck >= 0 do if #TKAD.routing_table[buck] > 0 then results = misc.merge(results, TKAD.routing_table[buck]) end buck = buck-1 if #results == TKAD.k then return results elseif #results > TKAD.k then local set = {} results = TKAD.update_results(results, set, key) return results end end return results end,

callback =function(signal, results) events.fire(signal,results) end,

find_node = function(signal, me, key) events.thread(function() local closest = TKAD.k_closest_bucket_nodes(key) rpc.call(me.peer, {'TKAD.callback', signal, closest}) end) end,

find = function(key) --the first nodes to contact are k nodes from the closest k-bucket results = TKAD.k_closest_bucket_nodes(key) local hops = 0 local start_time = tostring(misc.time()) local sent = {} while true do local index = TKAD.already_sent(results, sent) if index == -1 then return results, hops, start_time else local current = results[index] --display_view(results, "results before update from: "..num(current).."("..bit_out(current.id).."):") local signal = current.id..key sent[#sent+1] = current rpc.call(current.peer, {'TKAD.find_node', signal, me, key}) local p_results = events.wait(signal) --display_view(p_results, "p_results from node: "..num(current).."("..bit_out(current.id).."):") results = TKAD.update_results(results, p_results, key) --display_view(results, "results after update from: "..num(current).."("..bit_out(current.id).."):") hops = hops + 1 end end end,

lookup = function(key) log:print("LOOKUP_START ".. num(key).."("..key ..")") local nearest_k, hops, start_time = TKAD.find(key) local elapsed = misc.time()-start_time log:print("LOOKUP_END") local correct = TKAD.check_correctness(key, nearest_k) local res=nil if correct then res = "CORRECT_FOUND " else res = "CORRECT_NOT_FOUND " end local msg = res.."LOOKUP KEY "..num(key).." HOPS "..hops.." DELAY "..elapsed.." RESPONSIBLE: " for i,v in ipairs(nearest_k) do msg = msg..table.concat({" ",num(v)}) end log:print(msg) end,


-- Routing correctness

precompute_resp_nodes = function() local ids = TKAD.hash_all() ids[#ids+1] = me.id for i,k in ipairs(TKAD.keys) do TKAD.xor_rank_pure(ids, k) TKAD.responsible[k] = {} --local out="resp for "..k.." with bucket_num "..TKAD.bucket_num(k)..": " for i=1, TKAD.k do table.insert(TKAD.responsible[k],ids[i]) -- out=out.." "..num(ids[i]) end -- log:print(out) end end,

check_correctness = function(key,closest) for i,v in ipairs(closest) do --log:print("closest", i, num(v)) local match = false for j,w in ipairs(TKAD.responsible[key]) do if num(v) == num(w) then match = true end end if not match then return false end end return true end,


-- TMAN

init = function() for i = 0, bits - 1 do TKAD.routing_table[i] = {} end TKAD.view = misc.random_pick(PSS.view, TKAD.v) TKAD.check_convergence(TKAD.cycle,"TKAD.init") end,

-- ranks view according to xor_diff from self and selects a random node from the first m nodes select_peer = function() return misc.random_pick(TKAD.view) end,

-- select peer from pss view, different from self select_peer_pss = function() PSS.view_copy_lock:lock() local i repeat i = math.random(#PSS.view_copy) until not TKAD.same_node(PSS.view_copy[i], me) local partner = PSS.view_copy[i] PSS.view_copy_lock:unlock() return partner end,

create_message = function(partner) TKAD.view_lock:lock() local buffer = misc.dup(TKAD.view) TKAD.view_lock:unlock() TKAD.rt_lock:lock() buffer = misc.merge(buffer,PSS.view, TKAD.flatten(TKAD.routing_table)) TKAD.rt_lock:unlock() buffer[#buffer+1] = me TKAD.remove_dup(buffer) TKAD.remove_node(buffer, partner) return buffer end,

--merges TKAD view with the received message update_view = function(received) TKAD.view_lock:lock() TKAD.view = misc.merge(TKAD.view, received) TKAD.remove_dup(TKAD.view) TKAD.xor_rank_pure(TKAD.view,me) --TKAD.xor_rank(TKAD.view,me) TKAD.keep_n(TKAD.view, TKAD.v) TKAD.view_lock:unlock() end,

-- add nodes from the received to the corresponding k-buckets update_prefix_table = function(received) TKAD.rt_lock:lock() for i,v in ipairs(received) do local buck = TKAD.bucket_num(v) if not TKAD.routing_table[buck] then TKAD.routing_table[buck] = {} table.insert(TKAD.routing_table[buck], v) else if #TKAD.routing_table[buck] < TKAD.k then --bucket is not full --check if this element is already in the bucket: yes - move it to the tail, no - add it to the tail local index = TKAD.already_in(TKAD.routing_table[buck], v) if index > 0 then table.insert(TKAD.routing_table[buck], table.remove(TKAD.routing_table[buck], index)) else table.insert(TKAD.routing_table[buck], v) end else --bucket full: ping the element at the head, if there is response - move to the tail; no response - evict and push the new element to the tail if TKAD.latency(TKAD.routing_table[buck][1]) then table.insert(TKAD.routing_table[buck], table.remove(TKAD.routing_table[buck], 1)) else table.remove(TKAD.routing_table[buck], 1) table.insert(TKAD.routing_table[buck], v) end end end end TKAD.rt_lock:unlock() end,

passive_thread = function(received,sender) local buffer = TKAD.create_message(sender) TKAD.update_view(received) TKAD.update_prefix_table(received) return buffer end,

select_partner = function(partner_select_code, loc_cycle) if partner_select_code == 0 then --alternating if loc_cycle <= 2 then return TKAD.select_peer_pss() else if loc_cycle%2==0 then return TKAD.select_peer_pss() else return TKAD.select_peer() end end elseif partner_select_code == 1 then --only pss return TKAD.select_peer_pss() elseif partner_select_code == 2 then --only view return TKAD.select_peer() end end,

active_thread = function() TKAD.c_lock:lock() local loc_cycle = TKAD.cycle + 1 TKAD.cycle = TKAD.cycle + 1 TKAD.c_lock:unlock() local partner = TKAD.select_partner(PARTNER_SELECT,loc_cycle) local buffer = TKAD.create_message(partner) local try = 0 local ok, res = rpc.acall(partner.peer, {'TKAD.passive_thread', buffer, me}) while not ok do try = try + 1 if try <= 3 then log:print("TKAD active thread: no response from:"..partner.id.. ": "..tostring(res).." => try again") events.sleep(math.random(try * 30, try * 60)) ok, res = rpc.acall(partner.peer, {'TKAD.passive_thread', buffer, me}) else log:print("TKAD active thread: no response from:"..partner.id..": "..tostring(res).." => removing it from view") TKAD.remove_failed_node(partner) break end end if ok then local received = res[1] TKAD.update_view(received) TKAD.update_prefix_table(received) --resource_stats() TKAD.check_convergence(loc_cycle,"TKAD.active_thread") --TKAD.debug(loc_cycle) end end, }


-- Main loop

function resource_stats() --log:print("MEMORY_USED_Kb ", gcinfo()) local ts,tr = socket.stats() local tot_KB_sent=misc.bitcalc(ts).kilobytes local tot_KB_recv=misc.bitcalc(tr).kilobytes --log:print("BANDWIDTH_TOTAL ",tot_KB_sent, tot_KB_recv) --log:print("BANDWIDTH_RATE ", (tot_KB_sent - bytesSent )/STATS_PERIOD, (tot_KB_recv - bytesReceived) /STATS_PERIOD) bytesSent = tot_KB_sent bytesReceived = tot_KB_recv end

function terminator() events.sleep(max_time) os.exit() end

function main() -- this thread will be in charge of killing the node after max_time seconds events.thread(terminator)

log:print("UP: "..job.me.ip..":"..job.me.port.."id "..me.id)

-- init random number generator math.randomseed(job.position*os.time())

-- wait for all nodes to start up (conservative) events.sleep(5)

-- desynchronize the nodes local desync_wait = (GOSSIP_TIME * math.random()) log:print("waiting for "..desync_wait.." to desynchronize") events.sleep(desync_wait)

PSS.pss_init() PSS_thread = events.periodic(PSS_SHUFFLE_PERIOD, PSS.pss_active_thread) events.sleep(60)

TKAD.precompute_routing_table() TKAD.init() events.sleep(20)

log:print("VIEW_CONSTRUCTION_START_TIME", misc.time()) TKAD_thread = events.periodic(GOSSIP_TIME, TKAD.active_thread)

events.sleep(200)

events.kill(TKAD_thread) events.kill(PSS_thread)

if TKAD_LOOKUP then --generate keys for i = 1, 50 do TKAD.keys[#TKAD.keys+1] = compute_id(math.random()) end

   --precompute responsible nodes
   TKAD.precompute_resp_nodes()
   events.sleep(20)

   --start lookups
   for i,v in ipairs(TKAD.keys) do
       events.thread(function() TKAD.lookup(v) end)
       events.sleep(1)
   end

end end

events.thread(main) events.loop()

Reply to this email directly or view it on GitHubhttps://github.com/splay-project/splay/issues/52.

vschiavoni commented 8 years ago

I'm confident it's a concurrency issue because the same scenario with less concurrent queries (thus, less overlapping in-flight queries traversing the network) does not produce the behaviours shown in the attached gantt chart (that is, queries getting slower and slower over time). It'd be nice to have a simpler test case, I agree. Exploiting the multi-core nature of today's hardware using the same facilities we're used to with Splay to produce more efficient code would not hurt: as of today, the observed performances of the prototypes implemented in Splay are somewhat limited by the (fake) concurrency support. Something I had experimented with was https://github.com/vschiavoni/splay_llthreads

etriviere commented 8 years ago

This is true but this would require a re-design that goes beyond what we can do. In particular, we would probably have to move away from Lua. Note sure this is worth the pain.

Etienne

On 23 Nov 2015, at 14:24, Valerio Schiavoni notifications@github.com wrote:

I'm confident it's a concurrency issue because the same scenario with less concurrent queries (thus, less overlapping in-flight queries traversing the network) does not produce the behaviours shown in the attached gantt chart (that is, queries getting slower and slower over time). It'd be nice to have a simpler test case, I agree. Exploiting the multi-core nature of today's hardware using the same facilities we're used to with Splay to produce more efficient code would not hurt: as of today, the observed performances of the prototypes implemented in Splay are somewhat limited by the (fake) concurrency support. Something I had experimented with was https://github.com/vschiavoni/splay_llthreads

� Reply to this email directly or view it on GitHubhttps://github.com/splay-project/splay/issues/52#issuecomment-158932613.