anacrolix / dht

dht is used by anacrolix/torrent, and is intended for use as a library in other projects both torrent related and otherwise
Mozilla Public License 2.0
313 stars 66 forks source link

bep44: mutable items #4

Closed mh-cbon closed 7 years ago

mh-cbon commented 7 years ago

Hi,

i was trying to figure out how much of this bep was available in your package. http://bittorrent.org/beps/bep_0044.html

I found the immutable items part was there, if i dont mistake.

About mutable items, there are few more operation to do when a message is sent / receive. I was wondering how that would operate with your package. inside or outside of it ? if there was some specific ways to implement those extra functions with the lib ?

I was looking for a Put method at Server.go, but i could not find it. Is it missing as i believe i understand ?

thanks for help!

anacrolix commented 7 years ago

I don't believe I have put or get implemented. However they're variations of existing messages, and would require no extra special treatment. You'll want to add Server.Put and Server.Get, and maybe a helper to link the token from the get for the put. Use the existing handlers to guide you here, it should be quite straight forward.

mh-cbon commented 7 years ago

Just reporting, if any comments, happy to read about,

A demo main that fails, i did not yet find how to fix it,

package main

import (
    "flag"
    "fmt"
    "time"

    "github.com/anacrolix/dht"
)

func main() {

    port := flag.Int("port", 9091, "")
    flag.Parse()

    conf := &dht.ServerConfig{
        Addr:               fmt.Sprintf("localhost:%v", *port),
        NoDefaultBootstrap: true,
        NoSecurity:         true,
        BootstrapNodes:     []string{"localhost:9091", "localhost:9092", "localhost:9093"},
    }
    serv, err := dht.NewServer(conf)
    if err != nil {
        panic(err)
    }
    infoHash := [20]byte{}
    copy(infoHash[:], fmt.Sprintf("whatever%v", *port))
    impliedPort := true

    go func() {
        for {
            select {
            case <-time.After(5 * time.Second):
                ann, err := serv.Announce(infoHash, 0, impliedPort)
                if err != nil {
                    fmt.Println(err)
                }
                fmt.Println(ann)
            }
        }
    }()

    <-make(chan bool)
}

it will respond: server has no starting nodes

Something else i wish i can fix in future,

mh-cbon  13828  0.2  0.0 197812     0 pts/0    Sl+  12:45   0:00 go run main.go
mh-cbon  13852 **78.2** 40.8 2724000 1617536 pts/0 Sl+  12:45   0:36 /tmp/go-build320258456/command-line-arguments/_obj/exe/main
mh-cbon  13860  0.2  0.0 207064     4 pts/1    Sl+  12:45   0:00 go run main.go -port 9092
mh-cbon  13885 **80.0** 41.4 2713364 1640588 pts/1 Rl+  12:45   0:36 /tmp/go-build731353978/command-line-arguments/_obj/exe/main -port 9092
anacrolix commented 7 years ago

It looks like the logic is wrong when selecting the bootstrap nodes around NoDefaultBootstrap. I think if you set NoDefaultBootstrap to false for now it should work as you expect, and I'll fix the logic later.

mh-cbon commented 7 years ago

yeeeesss! thanks:)

$ go run main.go 
&{{{0 0} <nil> [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0] {0 0 <nil>} 0} 0xc42016c060 0xc42016c120 0xc42016c0c0 0xc42015a6a0 false 0 0xc420094b00 whatever 0 0 true}
&{{{0 0} <nil> [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0] {0 0 <nil>} 0} 0xc42016cc00 0xc42016ccc0 0xc42016cc60 0xc4201bca80 false 0 0xc420094b00 whatever 0 0 true}

$ go run main.go -port 9092
&{{{0 0} <nil> [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0] {0 0 <nil>} 0} 0xc420132960 0xc420132a20 0xc4201329c0 0xc42012cba0 false 0 0xc420086b00 whatever 1 0 true}
&{{{0 0} <nil> [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0] {0 0 <nil>} 0} 0xc420132f00 0xc420132fc0 0xc420132f60 0xc42012d560 false 0 0xc420086b00 whatever 0 0 true}

awesome!

anacrolix commented 7 years ago

Yep, DHT is fun 😛

mh-cbon commented 7 years ago

I m still at early stages, but if yet you have comments, i ll read them happily :)

anacrolix commented 7 years ago

It looks pretty good so far, I have a few comments when I have more time.

mh-cbon commented 7 years ago

Hi,

I completely rebooted my first attempt, as it became too complex.

Now the various elements like token, keys, mutable/immutable are handled.

Looks much better, imo, the most noticeable change is that now the Put/Get method will return an op handle.

An op handle is a put / get operation on the dht, the caller can subscribe a callback to it to listen to the operation updates (to be polished).

An operation update could be a response error from a node, or a success response, it only changes the overall status of the op, triggers the callback, the receiver can decides what it should do according to the report provided by the op handle.

In plain code!

if op, err := table.Get(msg); err == dht.ErrWorkStart {
            done := make(chan bool)
            go func() {
                sub := op.Subscribe(func(s dht.MsgStats) {
                    fmt.Printf(`stats %v %v
Started=%v Success=%v BrowsedNodes=%d
`, s.Action, s.InfoHash, s.Started, s.Success, s.BrowsedNodes)
                    fmt.Printf("Value=%q\n", s.Value)
                    if s.AllGood() {
                        log.Println("all good, leaving")
                        done <- true
                    } else if s.Finished() {
                        log.Println("finished, but it had errors, leaving anyway")
                        done <- true
                    }
                })
                <-done
                op.Unsubscribe(sub)
                allDone <- true
            }()
        } else if err != nil {
            panic(err)
        }

Where the first err can be something like WorkStart => means=> as a caller, i might need to subscribe, WorkInProgress => as a caller, i need to decide, re emit new/old message (to be handled properly) /something else/ like an error while encoding / decoding values, network error => as a caller, panic/return

What next ?

It needs tests, and i need to re read again the bep to check all the things.

Then, i m wondering, if its not possible to make that a plugin of your dht package, looks likes it is possible, i m not sure if its a good idea.

Either case, i m eager to get comments, sorry for the 4.5k lines additions....

anacrolix commented 7 years ago

I don't think it needs to be a plugin, the functionality coexists with the torrent parts of DHT just fine. As long as it doesn't get in the way. I'm not sure about using the custom generated table/store types, I'd prefer some regular types. Are you sure Put and Get need to be a callback model, and can't use a synchronous call like Announce? Do they need to traverse the network?

mh-cbon commented 7 years ago

Are you sure Put and Get need to be a callback model, and can't use a synchronous call like Announce?

In that last proposal i made it very simple and sticky to its meaning (a get is really just a get). It let the choice to the consumer to decide how he want to consume the dht.

I did not felt like including a re emitter, to keep alive data on the dht, was part of that implementation, i prefer to keep it away.

Also a real life get query with a read timeout support is quiet easy to implement,

    done := make(chan bool)
    go func() {
        sub := op.Subscribe(func(s dht.MsgStats) {
            done <- true
        })
        select {
        case <-done:
            allDone <- true
        case <-time.After(time.Minute):
            allDone <- false
// if op would implement a ctx.cancel method, it could be put here.
        }
        op.Unsubscribe(sub)
    }()

Do they need to traverse the network?

Yes I think so, although its not really clear in the bep. Here is what i found that suggests it,

The DHT as defined by BEP 5 [1] only provides the ability to store and retrieve IP addresses associated with a hash. However, there are many other types of data which may be useful to store in the DHT. This extension defines a protocol which enables new features to utilize the DHT as a general purpose key/value store.

Its vague ~~~

Also, if it does not attempt to browse the dht, i m afraid the lookup will fail IRL. I must admit that part related to the kadmelia proposal is NOT 150% clear to me, i might got it wrong.

In short, i want to test that IRL, some days, with two peer across the wan, so i can test proof empirically.

Until now i tested only on the local loop : / too good too be true.

I'm not sure about using the custom generated table/store types, I'd prefer some regular types.

yeah i can hear about that. Maybe its not really conform to the go-thinking, if i may be honest, IOLO, and as long as it does not get in the way, or proven wrong, indeed, i don t really want to care because i generated 75% garbage code. What i m not happy about it right now, is that it enlarges the public api surface a lot, but this is just a feature i have not met before that implementation. Now i m thinking to fix that missing feature, thus be able to generate proper private types as well, as a consequence keep a small public api.

Its also possible it creates more contention than a pure chan based implementation, i trade that for simplicity, indeed, a pure chan based impl seems very complex to me. Maybe I m not good enough ? : / iolo ? to be friendly honest : )

I don't think it needs to be a plugin, the functionality coexists with the torrent parts of DHT just fine.

cool, lets see if we can arrange with those different code style or not, if not, i suggest i investigate that plugin idea. From there, so far, to make it plugable, it looks like i need a proxy instance of server which exposes the few private methods i need to consume in the plugin.

mh-cbon commented 7 years ago

i figured out how to remove the list of browsed nodes, it was easy (in fact!), i replaced it with the bloom filter (absolutely no idea how that works, but its great : ).

To avoid a disaster, I was looking to optimize the write token list i maintain, i realized (if i m correct), in the announce implementation, it never stores those tokens, it literally gets to announce, if i m correct. I m not sure what i ll do, i may want to try a mix of both to avoid some redundant get queries.

One question i have is about Server.query, is it aware of timeout to force response callback to be invoked ? Otherwise i m going to have pending queries in the wild, IRL.

Besides that, made private types whenever it makes sense, fixed a few problem, added some comments, applied refactoring, the op emits stats anytime it changes, an op can succeed only once (!!).

anacrolix commented 7 years ago

I won't have time to review this for at least a week.

mh-cbon commented 7 years ago

mhh something is unfortunate, when i try with public bootstrap nodes, i can t get a workable node.

9 times out of 10, it won t respond.

Rest of the time it replies with 203 'invalid token', also got some krpc.Error{Code:203, Msg:"invalid value for 'target'"}.

But the most worrying, when i issue a get request to obtain a write token, the reply wont contain it, so when i issue the put request i don t have the required write token : /

go run main.go -put rr -kname mykey -salt pc -b public
2017/06/21 23:48:17 Listening on :9090
publicKey=9280ccea713d7acff965696a34a4357180a63ec7a23266d565bef7655fc87f5b
infoHash=13fc1e9bce8db41e5c5a997a46bd288857358aa8
2017/06/21 23:48:17 searching.....
2017/06/21 23:48:22 searching.....
2017/06/21 23:48:27 [{0xc42001d000} {0xc42018c180} {0xc42001d100} {0xc42001d380} {0xc42001d580} {0xc42001d280} {0xc42001d700} {0xc42001d080} {0xc42001d780} {0xc42001d300}]
stats put 13fc1e9bce8db41e5c5a997a46bd288857358aa8  Started=true Success=false
InfoHash="13fc1e9bce8db41e5c5a997a46bd288857358aa8"
Value=""
Errors=&{[{'Ë' "KRPC error 203: invalid token" "203: KRPC error 203: invalid token" %!q(*krpc.Error=&{203 invalid token})}]}
ErroredNodes=1
SuccessNodes=0
PendingNodes=10
RespondedNodes=1

stats put 13fc1e9bce8db41e5c5a997a46bd288857358aa8  Started=true Success=false
InfoHash="13fc1e9bce8db41e5c5a997a46bd288857358aa8"
Value=""
Errors=&{[{'Ë' "KRPC error 203: invalid token" "203: KRPC error 203: invalid token" %!q(*krpc.Error=&{203 invalid token})} {'ʚ' "KRPC error 666: timeout" "666: KRPC error 666: timeout" %!q(*krpc.Error=&{666 timeout})}]}
ErroredNodes=4
SuccessNodes=0
PendingNodes=10
RespondedNodes=5
stats put 13fc1e9bce8db41e5c5a997a46bd288857358aa8  Started=true Success=false
InfoHash="13fc1e9bce8db41e5c5a997a46bd288857358aa8"
Value=""

Errors=&{[{'Ë' "KRPC error 203: invalid token" "203: KRPC error 203: invalid token" %!q(*krpc.Error=&{203 invalid token})} {'ʚ' "KRPC error 666: timeout" "666: KRPC error 666: timeout" %!q(*krpc.Error=&{666 timeout})}]}
2017/06/21 23:49:27 finished, but it had errors, leaving anyway

not cool : / I ll try to implement a more aggressive put strategy, but that does not looks good.

mh-cbon commented 7 years ago

HI,

I hit a "concurrent map read and map write" race errors while creating new transactions after a sendQuery command.

fatal error: concurrent map read and map write

goroutine 224 [running]:
runtime.throw(0x746cfd, 0x21)
    /home/mh-cbon/.gvm/gos/go1.8/src/runtime/panic.go:596 +0x95 fp=0xc42026cc08 sp=0xc42026cbe8
runtime.mapaccess2(0x6de080, 0xc42001fe60, 0xc42026cca8, 0xc4203df50e, 0x2)
    /home/mh-cbon/.gvm/gos/go1.8/src/runtime/hashmap.go:377 +0x244 fp=0xc42026cc50 sp=0xc42026cc08
github.com/anacrolix/dht.(*Server).addTransaction(0xc420102180, 0xc4203dcbd0)
    /home/mh-cbon/gow/src/github.com/anacrolix/dht/server.go:472 +0x88 fp=0xc42026ccd8 sp=0xc42026cc50
github.com/anacrolix/dht.(*Server).query(0xc420102180, 0x88ce80, 0xc4204c0840, 0x73cc72, 0x3, 0xc420393c80, 0xc4203df550, 0x6eedc0, 0xc42026ce01, 0xc4203df550)
    /home/mh-cbon/gow/src/github.com/anacrolix/dht/server.go:533 +0x6c9 fp=0xc42026ce08 sp=0xc42026ccd8
github.com/anacrolix/dht.(*proxyServer).Query(0xc4200cbec0, 0x88ce80, 0xc4204c0840, 0x73cc72, 0x3, 0xc420393c80, 0xc4203df550, 0xc420393d08, 0xc42000e178, 0x710480)
    /home/mh-cbon/gow/src/github.com/anacrolix/dht/plugin.go:71 +0x77 fp=0xc42026ce68 sp=0xc42026ce08
github.com/anacrolix/dht/bep44.(*Plugin).queryTout(0xc420100a00, 0x88e3c0, 0xc420014290, 0x88ce80, 0xc4204c0840, 0x73cc72, 0x3, 0xc420393c80, 0xc420393ce0, 0xc42036eb40, ...)
    /home/mh-cbon/gow/src/github.com/anacrolix/dht/bep44/client.go:193 +0x12c fp=0xc42026ced8 sp=0xc42026ce68
github.com/anacrolix/dht/bep44.(*Plugin).runGet.func1(0xc4204c0b00, 0x8, 0x8)
    /home/mh-cbon/gow/src/github.com/anacrolix/dht/bep44/client.go:141 +0x206 fp=0xc42026cfc8 sp=0xc42026ced8
runtime.goexit()
    /home/mh-cbon/.gvm/gos/go1.8/src/runtime/asm_amd64.s:2197 +0x1 fp=0xc42026cfd0 sp=0xc42026cfc8
created by github.com/anacrolix/dht/bep44.(*Plugin).runGet.func1.1.1
    /home/mh-cbon/gow/src/github.com/anacrolix/dht/bep44/client.go:137 +0x1a1

in my version it happens here https://github.com/anacrolix/dht/blob/master/server.go#L462

Is it ok to lock that ?

mh-cbon commented 7 years ago

hi matt,

never mind, i rebooted (once again), from the beginning porting some js code (https://github.com/tristanls/k-bucket + https://github.com/mafintosh/k-rpc-socket + https://github.com/mafintosh/k-rpc).

It works much better, essentially because of the k-bucket.

thanks for the sharing, it was very helpful.

anacrolix commented 7 years ago

I've added a k-bucket to this implementation now, due to the performance hit responding to queries without it. Thanks for the push to complete this.

mh-cbon commented 7 years ago

I wished i could be more helpful to you, somehow unhappy i had to restart from scratch. I gone through your latest commit, i guess it was not enough efforts to figure things out. Anyway i wanted to let you know you might want take a look at https://github.com/mh-cbon/dht/tree/master/bucket and if bep44 mutable items are added https://github.com/mh-cbon/dht/blob/master/ed25519/fix.go. Also, big big thanks for utp packages see (https://github.com/mh-cbon/dhtest), lots of fun to share the udp conn with an utp conn :D