Open nathanielc opened 1 year ago
There is a long discussion in https://github.com/libp2p/rust-libp2p/pull/3076 that is highly relevant for this.
I am still of the same opinion that I think we should design RecordStore
to:
Clone
Future
s that don't have lifetimes&self
As a result, we can delete a lot of code in kademlia that just does message-passing between handler and behaviour. It also means fallible and async implementations of the store are trivial. It works out of the box (contrary to the "always emit and event and have the user submit the response"-model).
@mxinden Thumbed-up the last comment there but I am not sure if that means he agrees with this direction overall or just the argument in particular.
Happy to mentor you through this if you are open to send a PR :)
I am still of the same opinion that I think we should design
RecordStore
to:
Implement
Clone
Return
Future
s that don't have lifetimesTake
&self
Sounds good to me.
I am still of the same opinion that I think we should design
RecordStore
to:
- Implement
Clone
- Return
Future
s that don't have lifetimes- Take
&self
Sounds good to me.
Me too.
So in summary combining feedback from these several issues we have:
However with these changes we still do not have anything that addresses the memory pressure. Is there something we can change about the records
and provided
methods to allow for some kind of batching/paging of the records? Maybe make the trait generic over a Cursor type that is opaque to the kademlia code but is returned and passed back into the records
and provided
calls to resume where it left off?
Looking at the interface, what are these functions actually used for? We don't need them within libp2p-kad
, do we?
Thus we could also just remove them and move the responsibility of providing such a function to the concrete type that implements Store
. If we don't have any functions that are abstract over these iterators, we don't need to include them in the interface.
They are used for the republish logic. The current implementation calls provided
and collects all records into a Vec and starts to publish them in small batches. In our use we have disabled the republish logic and implemented it outside of the protocol implementation because of these limitations. I would support removing that logic and letting concrete types manage it externally.
Could we implement republishing by just returning a list of keys and then fetch each key individually? That would reduce the memory consumption to only fit all keys into memory.
It would result in O(n)
queries to republish which could be expensive if we are talking to an external DB.
Alternatively, we could return a Stream
inside for those functions but then we have to be careful in regards to allocations and probably use Bytes
or something internally.
For our use case we are using provider records exclusively and so the key is the significant portion of the data. We expect 10M+ keys for a single peer. That's a lot of memory to use when we only need to access the memory about once every 12h and whenever its queried.
A Stream sounds like a workable solution to me.
For our use case we are using provider records exclusively and so the key is the significant portion of the data. We expect 10M+ keys for a single peer. That's a lot of memory to use when we only need to access the memory about once every 12h and whenever its queried.
The memory would only be consumed when you want to re-publish. I was thinking of something like:
pub trait RecordStore {
fn record_keys(&self) -> Vec<Key>;
}
For 10M+ keys, this would mean allocating ~320MB (assuming a key-size of 32bytes) in order to iterate over it, get every record from the store and republish it. But yeah, it is O(n)
so not ideal.
One issue with making records
and provided
a Stream
is that the PeriodicJob
will need to poll
it. Currently, this poll
ing happens within NetworkBehaviour::poll
. We should avoid doing any form of IO in there because it will impact the latency with which we can handle incoming events. So far, rust-libp2p
offloads any form of IO or heavy computation to the ConnectionHandler
s which all run in a separate task.
We can say that the Future
s and Stream
s returned from RecordStore
MUST NOT perform any IO but use channels or something like that. We could enforce that by having RecordStore
return an mpsc::Receiver<Record>
.
Thanks for the context, re IO in behaviors vs handlers that's really helpful to understand.
Currently we use a custom implementation to publish the records. Instead of once every publish interval waking up and starting to publish the records we continuously publish small batches of records during the whole publish interval. This way we amortize the work across the interval. We have enough records it can take the whole interval of 12 hours to publish them. With the current implementation that would basically mean we always have a copy of the keys on disk and in memory constantly.
What if we split these concerns of re-publishing and the store into two different traits?
The RecordStore would not need to be to have the records/provided methods anymore keeping it simple and focused on serving queries from the network.
The PublishJob trait would be a Stream that returns a set of keys. Once those keys are re-published (or replicated ) the Stream is polled for a new batch. This way the kademlia protocol doesn't have to concern itself with passing data around from the RecordStore to the publisher logic.
The current implemtation of the publish job can be refactored in terms of this trait and so most users can simply consume that logic. Then for these more complicated uses cases like our we can implement our own version of the PublishJob to play our amortize games etc.
It seems like having two traits could actually be simpler than having one more complex trait. Thoughts?
As an aside once you get enough records it can become more efficient to invert the republish logic. The current implementation iterates through each record and contacts the closest peers in the DHT to republish. Instead you can walk the peers in the DHT and query which records you need to publish to that node. Once the number of records is significantly greater than the number of peers in the network this inverted logic is much more efficient.
So I can see a world in which there are two long lived implementations of the PublishJob for the kad protocol. One were its expect that number of peers >> number of records and another where number of records >> number of peers. It would be reasonable to have both of these implementations as part of the libp2p_kad crate itself.
What if we split these concerns of re-publishing and the store into two different traits?
* RecordStore - only does get/put of records * PublishJob - a job that can be pulled for a batch of records that need to be re-published
How does the PublishJob
learn about the records that need to be re-published? Would it internally connect to the same record store, e.g. by accessing a shared hashmap or using the same DB connection?
We'd have the most flexibility if we'd remove the RecordStore
entirely from libp2p-kad
and have the user handle this. But that is a pretty low-effort solution because it means libp2p-kad
doesn't work out of the box.
Perhaps a wrapper around libp2p_kad::Behaviour
would be ideal? We could have a "low-level" behaviour that doesn't have a record store and emits events for each incoming message & requires passing the response back. The wrapper would also handle re-publishing.
More elaborate use-cases could then create their own wrapper behaviour.
What do you think?
As an aside once you get enough records it can become more efficient to invert the republish logic. The current implementation iterates through each record and contacts the closest peers in the DHT to republish. Instead you can walk the peers in the DHT and query which records you need to publish to that node. Once the number of records is significantly greater than the number of peers in the network this inverted logic is much more efficient.
The go-libp2p accelerated DHT client does something similar, where it sorts the to-be-published records by key and then walks the DHT, sending multiple records to the same peer in one go.
Agreed that this is a cool optimization. That said, I suggest narrowing the scope of this issue, i.e. do this in a future effort.
How does the PublishJob learn about the records that need to be re-published? Would it internally connect to the same record store, e.g. by accessing a shared hashmap or using the same DB connection?
This is what I was thinking, however I like the low-level behavior design as well. Its the idea the rust-libp2p would contain two kad behaviors? The low level one and the wrapper that does what the current implementation does using an in memory store? I.e. split the existing one into two behaviors?
Agreed that this is a cool optimization. That said, I suggest narrowing the scope of this issue, i.e. do this in a future effort.
Agreed, at this point I am trying to understand the general direction. Adding an optimized kad republish logic is definitely a separate issue.
Another question re the two behaviours design.
My understanding is that to pass messages between behaviours that happens outside the swarm. Meaning what ever type contains the swarm is responsible for calling the methods on the wrapping behavior when it gets events from the low-level behaviour.How would we ship that out of the box?
Its the idea the rust-libp2p would contain two kad behaviors? The low level one and the wrapper that does what the current implementation does using an in memory store? I.e. split the existing one into two behaviors?
Yes, that is the idea. Similar to how e.g. libp2p-rendezvous
wraps the libp2p-request-response
behaviour: https://github.com/libp2p/rust-libp2p/blob/caf9da4a69f1cce8183d4b17c792476ee44c37bc/protocols/rendezvous/src/client.rs#L40
My understanding is that to pass messages between behaviours that happens outside the swarm. Meaning what ever type contains the swarm is responsible for calling the methods on the wrapping behavior when it gets events from the low-level behaviour.How would we ship that out of the box?
Not quite. Because one wraps the other, it receives all events emitted by the inner one and can selective answer them and/or forward them. Each behaviour would probably have their own Event
type. I'd lean towards making the one with the MemoryStore
the "default" behaviour, i.e. libp2p_kad::Behaviour
and expose and additional libp2p_kad::low_level::{Behaviour,Event}
that allows for more customization.
Another alternative could be to entirely replace the current MemoryStore
with SQLite and by default, use the SQLite :memory:
database.
That would avoid the need for creating several layers of abstraction but be a more closed design, yet likely easier to ship.
Splitting the behaviour on the other hand gives users more freedom but it also means they might have to write more code and makes it less likely that we can provide something for that out of the box. For example, if access to the record store requires IO, we'd have to pull in an executor for the republishing logic whereas you can just unconditionally depend on tokio
in your application or create yourself a libp2p-kad-ceramic
crate that combines tokio
, sqlite
and an advanced republish logic .
Description
The libp2p_kad::store::RecordStore trait currently has a design that makes it difficult to implement a persistent backend.
There are challenges with the current design of the trait that make this difficult:
Instant Serialization
Specifically the ProviderRecord and Record types contain an Instant which by design cannot be serialized/deserialized.
I suggest we change the time type used from Instant to SystemTime. The trade off is that SystemTime is not guaranteed to be monotonic, i.e the system clock can be modified and so a time that was expected to be in the future may not be. However its possible to serialize/desialize a SystemTime (i.e. using seconds since the Unix Epoch). Time scales typically involved in record expiration are typically hours, at this scale its uncommon to see changes in monotinicity of a SystemTime.
Memory Pressure
The provided method produces an iterator over all entries in the store. Without a mechanism to paginate or resume from a cursor in the store the iterator may block other concurrent requests to the underlying store (i.e. sqlite).
Async API
Its not clear from the trait API or docs how/if any async system IO can be performed efficiently by an implementer. Are methods called concurrently? If system IO blocks the current thread will that potentially create a deadlock in the calling code? A persistent implementation needs answers to these questions.
Motivation
We have a use case where we are storing on the order of 100K - 10M provider records. Storing all of this data in memory is in efficient. Additionally we need the set of records to be persistent across restarts of the process.
Based on the design of the trait we have a few choices:
Current Implementation
The current implementation has one other limitation. While the
records
andprovided
methods return an iterator over the data, the iterator is immediately cloned/collected into a heap allocated vector. This means not only would we need to update the trait API but also update the consuming code to be memory efficient.Are you planning to do it yourself in a pull request ?
Maybe