tristanls / discover

Node discovery based on Kademlia DHT protocol.
MIT License
69 stars 10 forks source link

Discover Usage #11

Closed skeggse closed 10 years ago

skeggse commented 10 years ago

I've been trying to get discover running, and I think that some of my expectations about how discover works/should work are getting in the way of my understanding how it does work.

I'm also lamenting the lack of a clear end-to-end usage example.

var Discover = require('discover');
var Transport = require('discover-tcp-transport');
var crypto = require('crypto');

function arbiter(incumbent, candidate) {
  if (!incumbent || !incumbent.workerNodes)
    return candidate;

  if (!candidate || !candidate.workerNodes)
    return incumbent;

  var merged = {
    id: incumbent.id, // incumbent.id === candidate.id within an arbiter
    workerNodes: incumbent.workerNodes
  };

  for (var id in candidate.workerNodes) {
    var incumbentClock = incumbent.workerNodes[id].vectorClock;
    var candidateClock = candidate.workerNodes[id].vectorClock;
    if (candidateClock >= incumbentClock) {
      merged.workerNodes[id] = candidate.workerNodes[id];
    }
  }

  return merged;
}

function arbiterDefaults(contact) {
  if (!contact.data)
    contact.data = {};
  else if (!contact.data.workerNodes)
    contact.data.workerNodes = {};
  else {
    var workers = contact.data.workerNodes;
    for (var key in workers) {
      if (!workers[key].vectorClock) {
        workers[key].vectorClock = 0;
      }
    }
  }
  return contact;
}

var primary = process.argv[2] === 'primary';

var id = crypto.randomBytes(20).toString('base64');
console.log('worker', id);

var transport = new Transport({port: primary ? 9000 : 9001});

var options = {
  arbiter: arbiter,
  arbiterDefaults: arbiterDefaults,
  transport: transport
};

if (!primary) {
  options.seeds = [{
    id: new Buffer('hmmm'),
    transport: {
      host: '127.0.0.1',
      port: 9000
    }
  }];
}

var discover = new Discover(options);

transport.listen();

var workers = {};
workers[id] = {
  host: '127.0.0.1',
  port: primary ? 8000 : 8001
};

discover.register({
  id: new Buffer('UserService').toString('base64'),
  data: {
    workerNodes: workers
  }
});

if (!primary) {
  setTimeout(function() {
    discover.find(new Buffer('UserService').toString('base64'), function(err, contact) {
      if (err)
        return console.log('error', err.stack);
      console.log(require('util').inspect(contact, {depth: null, colors: true}));
    });
  }, 1000);
}
$ node discover-test primary &
worker 4oMG5JHBOIUELmSbR8X6tgzlq5o=
$ node discover-test secondary
worker yZ4NEadKOCpHfrt+ENnSSFL0vnY=
{ id: 'VXNlclNlcnZpY2U=',
  data: 
   { workerNodes: 
      { 'yZ4NEadKOCpHfrt+ENnSSFL0vnY=': 
         { host: '127.0.0.1',
           port: 8001,
           vectorClock: 0 } } },
  transport: { host: 'localhost', port: 9001 } }

I'm expecting the find to give me a merged object, and I'm expecting the arbiter to be called sometime, but that's not happening. What am I doing wrong?

tristanls commented 10 years ago

Thank you for the code. Turning on inlineTrace: true in Discover options showed me that an error is occurring:

// secondary
...
~trace error response from { id: <Buffer 68 6d 6d 6d>,
  transport: { host: '127.0.0.1', port: 9000 },
  distance: 4.618034004231753e+24 } looking for VXNlclNlcnZpY2U=: [Error: error]
{ id: 'VXNlclNlcnZpY2U=',
  data: 
   { workerNodes: 
      { '/oRRQd9uu/wtOR/NP6Mc+0p8kzI=': 
         { host: '127.0.0.1',
           port: 8001,
           vectorClock: 0 } } },
  transport: { host: 'localhost', port: 9001 } }

I'm looking into it.

tristanls commented 10 years ago

I meant inlineTrace: true, updated the comment.

skeggse commented 10 years ago

Aha, I think I got somewhere. The seed's id is a straight buffer--I forgot to convert it to base64. That said, it still doesn't produce the expected result.

Also, you'll notice the seed's id is 'hmmm,' because I'm not sure what's supposed to go there. As far as I can tell, a seed should only need to consist of a transport object, but if I omit the id, it fails outright.

tristanls commented 10 years ago

I had to reach into discover-tcp-transport to make the error message more meaningful, and got this:

~trace error response from { id: <Buffer 68 6d 6d 6d>,
  transport: { host: '127.0.0.1', port: 9000 },
  distance: 4.618034004231753e+24 } looking for VXNlclNlcnZpY2U=: [Error: Invalid or missing contact.id]

which agrees with what you stated. (gonna create an issue to have better errors than "error", not sure what I was thinking there).

tristanls commented 10 years ago

It looks like the arbiter is not doing what you expect it to. Btw.. this is my fault, the arbiter abstraction is new, so you're experiencing its current unpolished nature.

I've added ***ARBITER ENTRY*** and ***ARBITER EXIT*** to arbiter function, only entry shows up, meaning it exits before merging (I think):

...
***ARBITER ENTRY***
~trace adding { id: 'aG1tbQ==',
  transport: { host: '127.0.0.1', port: 9000 },
  distance: 4.618034004231753e+24 } to kBucket VXNlclNlcnZpY2U=
~trace response from { id: 'aG1tbQ==',
  transport: { host: '127.0.0.1', port: 9000 },
  distance: 4.618034004231753e+24 } looking for VXNlclNlcnZpY2U=: { id: 'VXNlclNlcnZpY2U=',
  data: { workerNodes: { 'mTQJxHWIn7lUkz+T3LSD5jmUuXY=': [Object] } },
  transport: { host: 'localhost', port: 9000 } }
{ id: 'VXNlclNlcnZpY2U=',
  data: 
   { workerNodes: 
      { '8QYfxDJ8stidl/d6CXUe6qPmilE=': 
         { host: '127.0.0.1',
           port: 8001,
           vectorClock: 0 } } },
  transport: { host: 'localhost', port: 9001 } }
tristanls commented 10 years ago

Notice that workerNodes is in data.workerNodes :)

tristanls commented 10 years ago

More inline logging...

***ARBITER ENTRY***
returning candidate
undefined <-- incumbent
tristanls commented 10 years ago

To answer your earlier usage complaint... you're right, there isn't a good one documented right now :/ . Here is an example that sets up 5 Discover nodes (https://github.com/tristanls/tart-ansible/blob/master/examples/readme.js) , I'm looking through that to compare your example code.

tristanls commented 10 years ago

That earlier comment on ***ARBITER ENTRY*** is not the actual problem.

That is the code path when the "hmmm" contact is being added.

I think I understand what is going on. Turning on stats helped as well (because a latency of 0ms means locally cached copy is returned):

discover.on('stats.timers.find.ms', function (latency) {
    console.log('~stats: discover.stats.timers.find.ms', latency);
});
discover.on('stats.timers.find.round.ms', function (latency) {
    console.log('~stats: discover.stats.timers.find.round.ms', latency);
});
discover.on('stats.timers.find.request.ms', function (latency) {
    console.log('~stats: discover.stats.timers.find.request.ms', latency);
});

We register "UserService" on both nodes. By registering a "UserService" at a node, it means that the node is capable of providing that service.

When you search for "UserService", the first place checked is the local cache. Since we previously registered the service locally, no network requests are made and the locally cached copy is returned.

If we want a remote answer, we can use discover.unreachable(contact) to tell Discover that what we have cached locally isn't useful. This will then cause Discover to go out to the network and ask for an answer. However, because we locally registered "UserService" before, we have a local registration that meets our search criteria, so again, no network requests are made and the locally registered copy is returned.

If we go through drastic measures and do discover.unregister(contact) to get rid of the local registration, Discover will go out to the network and return the result, but it will only be the remote result, and not the remote merged with a local copy :)

In order to get the merge behavior, I think a third node is necessary as part of the test and this third node should not have a local "UserService" on it.

skeggse commented 10 years ago

I had a feeling it might be related to caching.

I'd prefer it if I didn't need to have a third node, that defeats some of my reasons for using discover: no need to maintain a process that exists solely to enable service discovery.

One of the major pieces of discover I'm starting to fully understand is that discover is predicated more on pulling data rather than pushing and subscribing. A push-subscribe model is more useful for me because I need to react quickly to new workers and to workers that disappear. Given that an operation can span a number of workers, if I disable the caching functionality that adds a significant overhead to each step in an operation because a full DHT lookup is fairly expensive (especially in the context of a hundred steps in an operation).

skeggse commented 10 years ago

All that being said, I'm not sure how this would work at all if the cache is infinitely persistent, so maybe I'm not understanding how contacts are synchronized.

tristanls commented 10 years ago

I've added a third node locally, and what it ends up doing is asking just one of the nodes and getting a partial answer (the one that that node is familiar with). The interesting thing here is that if a node knows the answer to the query, it has no reason to ask other nodes that might have a different answer and then merge the results. It will simply answer with what it knows.

There is probably a bug in the "announce" mechanism, when a new registration happens, the node asks the DHT to return answers for a query about itself (the just registered contact). The "announce" includes the sender (the just registered contact), so the merge operation should happen during this portion. If I have info on "UserService" and another node asks me to find "UserService" but it itself has additional information on "UserService", I should merge the two and respond with the merger. I'll have to create an issue to investigate this.

As far as pull v. push, yes, DHT is a pull mechanism. Although as I've described above, the "announce" implements a push and a sort of refresh of the data.

Alternatively, have you considered gossip to address your use case? https://github.com/tristanls/gossipmonger or http://www.serfdom.io/

tristanls commented 10 years ago

Well, what I've said above about only asking one node is only true in this case because only the primary was the seed node. If both were the seed nodes, then the query would go to both of them. However, even with that, Discover picks the first response that satisfies and returns that result immediately.

tristanls commented 10 years ago

@skeggse, thank you so much for exercising this code base. I think this bug, https://github.com/tristanls/discover/issues/13, is probably responsible for breaking your expectations, and frankly, not working correctly. I'll try your test case again once I fix it.

skeggse commented 10 years ago

Awesome.

Thanks for pointing me to gossip--I'd heard of it but never really looked closely. It looks like it might work for my purposes, but I feel like there's a better solution. Distributed hash tables look like they solve the problem well, I just feel the need to increase consistency and reduce lookup time.

Thanks for looking the issues I'm finding!

skeggse commented 10 years ago

Just out of curiosity, what's the purpose of the id seed property?

tristanls commented 10 years ago

It does appear that the arbiter function is broken though:

***ARBITER ENTRY***
incumbent:
{ id: 'VXNlclNlcnZpY2U=',
  data: { workerNodes: { 'JamzodYtJNcovP8XPx1U+IsYycU=': { host: '127.0.0.1', port: 8000, vectorClock: 0 } } },
  transport: { host: 'localhost', port: 9000 } }
candidate:
{ id: 'VXNlclNlcnZpY2U=',
  data: { workerNodes: { 'XW2bMMfo2AnpcwZP9VjsT8Boyfo=': { host: '127.0.0.1', port: 8001, vectorClock: 0 } } },
  transport: { host: 'localhost', port: 9001 } }
merged:
{ id: 'VXNlclNlcnZpY2U=',
  data: { workerNodes: { 'JamzodYtJNcovP8XPx1U+IsYycU=': { host: '127.0.0.1', port: 8000, vectorClock: 0 } } } }
***ARBITER EXIT***

As far as id seed property... I think it comes from the idea that contact is a node, and a seed is a node, so it needs an id. It might be extraneous as you indicate. I'll make an issue to look into it.

skeggse commented 10 years ago

The arbiter just needs its condition modified:

    if (!(candidateClock < incumbentClock)) {

Instead of:

    if (candidateClock >= incumbentClock) {

That way if the incumbentClock is undefined, the expression will evaluate to true.

But yeah, good catch.

tristanls commented 10 years ago

Hey.. look at this! (this is what I see on the primary when running with a protype fix for #13)

***ARBITER ENTRY***
incumbent:
{ id: 'VXNlclNlcnZpY2U=',
  data: { workerNodes: { '328v5RvghbFNin1WAkUguQzQrhg=': { host: '127.0.0.1', port: 8000, vectorClock: 0 } } },
  transport: { host: 'localhost', port: 9000 } }
candidate:
{ id: 'VXNlclNlcnZpY2U=',
  data: { workerNodes: { '8Lzf++Wt0TwmQ10GOji4kePAd5o=': { host: '127.0.0.1', port: 8001, vectorClock: 0 } } },
  transport: { host: 'localhost', port: 9001 } }
merged:
{ id: 'VXNlclNlcnZpY2U=',
  data: 
   { workerNodes: 
      { '328v5RvghbFNin1WAkUguQzQrhg=': { host: '127.0.0.1', port: 8000, vectorClock: 0 },
        '8Lzf++Wt0TwmQ10GOji4kePAd5o=': { host: '127.0.0.1', port: 8001, vectorClock: 0 } } } }
***ARBITER EXIT***

Updated arbiter code:

var util = require('util');

function arbiter(incumbent, candidate) {
  console.log("***ARBITER ENTRY***");
  if (!incumbent || !incumbent.data || !incumbent.data.workerNodes) {
    console.log('returning candidate');
    console.dir(incumbent);
    return candidate;
  }

  if (!candidate || !candidate.data || !candidate.data.workerNodes) {
    console.log('returning incumbent');
    console.dir(candidate);
    return incumbent;
  }

  console.log('incumbent:');
  console.log(util.inspect(incumbent, {depth: null}));
  console.log('candidate:');
  console.log(util.inspect(candidate, {depth: null}));

  var merged = {
    id: incumbent.id, // incumbent.id === candidate.id within an arbiter
    data: {
        workerNodes: incumbent.data.workerNodes
    }
  };

  for (var id in candidate.data.workerNodes) {
    var incumbentClock;
    if (incumbent.data.workerNodes[id]) {
        incumbentClock = incumbent.data.workerNodes[id].vectorClock;
    }
    var candidateClock = candidate.data.workerNodes[id].vectorClock;
    if (incumbentClock && incumbentClock > candidateClock) {
        continue;
    }
    merged.data.workerNodes[id] = candidate.data.workerNodes[id];
  }

  console.log('merged:');
  console.log(util.inspect(merged, {depth: null}));

  console.log("***ARBITER EXIT***");
  return merged;
}
skeggse commented 10 years ago

Nice work!

tristanls commented 10 years ago

I gotta take a break, but if you want to push on experimenting locally, this is the prototype fix for #13, it's goes in around line 146 of index.js (previously it only has the // add the sender ... part):

        if (localContactKBucket && localContactKBucket.contact.id == sender.id) {
            localContactKBucket.contact =
                self.arbiter(localContactKBucket.contact, sender);
        } else {
            // add the sender
            // we do it only after we already got the closest contact to prevent
            // always responding with exact match to the sender if the sender is
            // announcing (searching for itself)
            var senderClosestKBuckets = self.getClosestKBuckets(sender.id);
            if (senderClosestKBuckets.length == 0) {
                if (self.tracing)
                    self.trace('no kBuckets for findNode sender ' + util.inspect(sender));
            } else {
                var senderClosestKBucketId = senderClosestKBuckets[0].id;
                var senderClosestKBucket = self.kBuckets[senderClosestKBucketId].kBucket;
                if (!senderClosestKBucket) {
                    if (self.tracing)
                        self.trace('no closest kBucket for findNode sender ' + util.inspect(sender));
                } else {
                    var clonedSender = clone(sender);
                    if (self.tracing)
                        self.trace('adding ' + util.inspect(clonedSender) + ' to kBucket ' + senderClosestKBucketId);
                    clonedSender.id = new Buffer(clonedSender.id, "base64");
                    senderClosestKBucket.add(clonedSender);
                }
            }
        }
skeggse commented 10 years ago

Cool, so I've been playing around with this a little, and while the primary node correctly merges the two versions, the secondary node does not. I'm still not familiar enough to understand why, though.

tristanls commented 10 years ago

The primary is up first. The secondary comes up and announces itself to the primary. The primary responds with its information but the secondary does not merge that apparently. I think this is the same bug with k-buckets as #13, but this time it is in the discover.add(contact) code instead of the on('findNode') handler.

tristanls commented 10 years ago

Ok, v0.4.1 is updated with bug fixes.

tristanls commented 10 years ago

Hmm... looks like fixing #13 didn't help with the secondary node getting updated info.

tristanls commented 10 years ago

Ah, same bug as #13 but in one more place :)

tristanls commented 10 years ago

Well, I've written additional tests, but #13 fixed the bugs I thought would be there. So, I'm still not sure whey the secondary node wouldn't be updated :/

skeggse commented 10 years ago

It looks like it's partially because the node event listener in the discover constructor calls the add method with the contact rather than the response. Interestingly, this causes secondary to have the workerNode for primary, but it loses the reference to its own workerNode along the way.

I should probably expand upon my query regarding the id seed property. In my naive little world I'd like to be able to have seeds be an array of transports rather than contacts. By doing so, the seeding process is simplified, but it would mean that the initial seed process requires a protocol extension to request all known contacts from the seed rather than getting contacts from the seed.

tristanls commented 10 years ago

I think you're still observing a variant of #13 bug that I haven't yet been able to isolate in a test. I've verified (using three node setup), that when it gets an answer from the primary, both worker nodes are present since they were merged, however when it gets an answer from the secondary, the secondary worker node is returned (not the primary).

$ node tmp/discover-test.js secondary
worker bcsUV17jhTnhH6/hrEa1w6XBs/s=
$ node tmp/discover-test.js ternary
worker e8FSFIKT+LXUy4EiDM+0zsPzX34=
...
~trace response from { id: 'aG1tbTI=',
  transport: { host: '127.0.0.1', port: 9001 },
  distance: 4.618033959758707e+24 } looking for VXNlclNlcnZpY2U=: { id: 'VXNlclNlcnZpY2U=',
  data: { workerNodes: { 'bcsUV17jhTnhH6/hrEa1w6XBs/s=': [Object] } },
  transport: { host: 'localhost', port: 9001 } }
{ id: 'VXNlclNlcnZpY2U=',
  data: 
   { workerNodes: 
      { 'bcsUV17jhTnhH6/hrEa1w6XBs/s=': 
         { host: '127.0.0.1',
           port: 8001,
           vectorClock: 0 } } },
  transport: { host: 'localhost', port: 9001 } }
...
tristanls commented 10 years ago

I'm pretty sure the fix goes here https://github.com/tristanls/discover/blob/e87ce3f5b4e9a5f881c16b4182e50c8c4fbdb967/index.js#L473 , but still can't figure out a unit test that demonstrates the failure case :) (aside from the large three node one)

skeggse commented 10 years ago

Does it make sense to change self.add(contact); to self.add(response); here?

tristanls commented 10 years ago

No, because we do want to add the contact as we just reached it.

But yes, we also want to consider what the response is! If the response is a contact that happens to match a locally registered contact it should be arbitrated like #13 instructs (I think).

The answer is both-ish.

Have to go for now, but I think you found it. Thanks!

skeggse commented 10 years ago

Might make sense to run the arbiter on contact and response. Not sure about that though.

It doesn't fix the issue entirely for me, though, as secondary now knows about primary but somehow manages to forget secondary. Is that what you're seeing or does secondary know about itself?

skeggse commented 10 years ago

Here's a more comprehensive use-case: https://gist.github.com/skeggse/f0ce99d8b68c951ce232. Even with the fix in #15 they're not communicating properly.

In one terminal:

$ node test --email
...
instance P9X68IUeI9UWSi3+qqIdmp9N/OI= on 8001
...

In another terminal:

$ node test --user --seed 'P9X68IUeI9UWSi3+qqIdmp9N/OI=:8001'

Or:

$ node test --user --email --seed 'P9X68IUeI9UWSi3+qqIdmp9N/OI=:8001'
tristanls commented 10 years ago

Thanks for the test code. I'll try to duplicate it and chase it down a little further today.

tristanls commented 10 years ago

I can't run the test code, I get:

~trace executeQuery({ nodeId: 'RW1haWxTZXJ2aWNl',
  nodes: 
   [ { id: 'mwKfHDFuuq3OJAA6my588o+LKiw=',
       transport: { host: '127.0.0.1', port: 8000 },
       distance: 1.2698935002975323e+48 } ],
  nodesMap: 
   { 'mwKfHDFuuq3OJAA6my588o+LKiw=': 
      { id: 'mwKfHDFuuq3OJAA6my588o+LKiw=',
        transport: { host: '127.0.0.1', port: 8000 },
        distance: 1.2698935002975323e+48 } },
  sender: undefined })

events.js:72
        throw er; // Unhandled 'error' event
              ^
Error: listen EADDRINUSE
    at errnoException (net.js:904:11)
    at Server._listen2 (net.js:1042:14)
    at listen (net.js:1064:10)
    at net.js:1146:9
    at asyncCallback (dns.js:68:16)
    at Object.onanswer [as oncomplete] (dns.js:121:9)

When trying to execute

$ node test --user --seed '...:8000'
tristanls commented 10 years ago

I think I have a fix (the three node test that I used initially seems to now do what was expected). I'm trying to isolate a test case right now.

tristanls commented 10 years ago

Ok, finally figured out what unit test to write. This should take care of updating locally registered contacts with arbitrated remote responses: https://github.com/tristanls/discover/commit/52974ca308b9582bddfc38ebbf7926da7ff44b19.

I think the one thing left now is to determine whether when the response is not an exact match, but instead, a list of nodes to query next, whether that information should be used for updates and arbitrated as well.

I'd like to hold off on that to check if this change solves the problem first.

tristanls commented 10 years ago

So it looks like I backed into your implementation as demonstrated in #15. I'll release this as v0.4.2, please give it a try.

skeggse commented 10 years ago

I'm on a different machine, and it's taking a little bit to get the example code I sent you running.

skeggse commented 10 years ago

I might've fixed the gist. I wasn't getting the same error you were, though. Kinda odd.

skeggse commented 10 years ago

Wonderful! The nodes' initial states correctly transfer and combine!

tristanls commented 10 years ago

portfinder doesn't work on my machine, it finds port 8000 over and over again

tristanls commented 10 years ago

Yay! :)

skeggse commented 10 years ago

@tristanls hmm that's odd. If you run two portfinders simultaneously that'd happen because neither value would be able to be used yet, but if something's listening on port 8000, it should choose the next one.

tristanls commented 10 years ago

Well, if it works for you, then I'm happy not debugging portfinder.

skeggse commented 10 years ago

It does work, but it's not updating the data after the initial connection. Is that the expected result?

tristanls commented 10 years ago

Hmmm.. I think that is expected.. well, at least works as designed, not necessarily expected :)

If a local node is queried and it knows the answer, it will respond with cached data and not generate network traffic. The use case being:

skeggse commented 10 years ago

Ah, ok. That's fine for now, then. Thanks!

tristanls commented 10 years ago

Thank you very much for helping with this.