Joystream / joystream

Joystream Monorepo
http://www.joystream.org
GNU General Public License v3.0
1.42k stars 115 forks source link

Distributor node - current progress and future plans #2553

Closed Lezek123 closed 1 year ago

Lezek123 commented 3 years ago

CLI

Following the storage node design, the distributor node CLI is currently a separate package, independent from joystream-cli, but sharing some similarities in the code (both are build with OCLIF framework, both manage api and accounts kind of similarly etc.).

The CLI is also the root of the project, it is used both to run the actual node and to perform some on-chain operations using the substrate api.

Currently the documentation of all supported commands can be found HERE

Commands were designed with scripability in mind and can easily be tested using bash scripts (see current example script)

Further developments:

Configuration

The distributor node CLI and the node itself are using a single YAML/JSON configuration file, the path to which:

The current config file has a following structure:

endpoints:
  queryNode: http://localhost:8081/graphql
  substrateNode: ws://localhost:9944
directories:
  data: ./local/data
  cache: ./local/cache
  logs: ./local/logs
log:
  file: debug
  console: debug
storageLimit: 10G
port: 3334
keys: [//Alice]
buckets: [0]

Most of the properties are self-explainatory, the ones that may not be immediately obvious include:

Distributor node API

The API is described by an OpenAPI schema (see the current schema here) and implemented with ExpressJS.

It currently exposes one endpoint - /asset/{objectId}, which serves the file by a specified objectId.

There are multiple scenarios of how a distributor node may act upon a request:

Scenario 1: The requested object is available in the distributor node filesystem (cache):

In this case:

TODO: There should be an additional check whether the object is still supposed to be served by the distributor node. If not, it should be dropped from the cache either before or after serving the request (since checking this would require communication with the query-node, it may be more suitable to do this after the request is served, to avoid any unnecessary delays)

TODO: There should be an additional HTTP header sent, indicating that the distributor node cache is beeing used (ie. X-Cache: HIT)

Scenario 2: The object is not yet cached, but is currently beeing fetched from the storage node

In this case the request is forwarded to the storage node that the object is beeing downloaded from. The reason why the request is not processed by streaming the data from file while it is still beeing fetched (which was my initial approach) is that the flow of how, for example, a video is beeing fetched by the browser, usually consists of making multiple partial (Range) requests, often targetting the very end of the file (most likely to read some of its metadata first). These kind of requests will be acted upon faster when forwarded to a storage node which already has the full data available.

TODO: There should be an additional HTTP header sent, indicating that the distributor node cache was missed, or more specifically, that the object is still being fetched (ie. X-Cache: MISS / X-Cache: FETCHING)

Scenario 3: The object is neither cached not currently beeing fetched

In this case the distributor node is making a request to query node to fetch details of the requested object, which include: content hash, object size, who is supposed to store the object, who is supposed to distribute it etc. It then proceeds to one of the following scenarios:

Scenario 3.1: The object doesn't exist

Node responds with HTTP 404 and a message

Scenario 3.2: The object is not distributed by the node

Node responds with HTTP 400 and a message

Scenario 3.3: The request was valid, the node needs to fetch the object

This triggers a pretty complex process of fetching the data object from storage node, which is described in detail in the Fetching data from storage node section below.

Once the storage node from which the object will be fetched is found, the request is handled in a way analogous to the one described in Scenario 2.

TODO: There should be an additional HTTP header sent, indicating that the distributor node cache was missed (ie. X-Cache: MISS)

Further plans

Fetching data from storage nodes

Finding nearby storage nodes:

In order to limit the number of requests beeing made on cache miss and the time it takes to fetch a new object in this scenario, the distributor node needs to keep some state about which storage endpoints will probably respond to a request most quickly.

This can be partially solved by making use of the metadata provided by storage node operators, which may include some approximate geographic coordinates (see Metadata standards section), which may provide some estimation of which nodes will likely respond faster (ie. a node that is supposedly 100 kilometers away will most likely respond faster that the one 10000 kilometers away). However, because the approach of using metadata geographic coordinates is very limited and it's possible that most storage providers will choose not to expose such information, the distributor node instead uses a different approach.

Currently the distributor node periodically (every x seconds) makes requests to all active storage provider endpoints (fetched from the query node) and measures their average response times. This is done asynchronously and independently of any incoming requests. By using the mechanism of queueing the requests with a relatively small concurrency limit, this process is relatively low-cost and provides a pretty good estimation on which nodes will likely be the best candidates for fetching data objects during a cache miss.

Data object fetching flow

A cache miss, as described in Scenario 3.3 in API section, triggers the folowing flow:

  1. First, the endpoints of storage providers that are supposed to store the given object are ordered by the mean response time (the process of obtaining it is described in the previous section)

  2. The requests are then sent to the storage endpoints, starting from the ones with lowest mean response time. Those are data object availibitly check requests, which are meant only to determine whether a given storage node indeed has the data object available. They are queued using a choosen concurrency (10 at the current time).

TODO: Currently there is no explicit way of checking whether the storage node has a given object available. The distributor node achieves this by sending a GET request with Range: 'bytes=0-0' header, effectively requesting just the first byte of the data to limit the potential waste of bandwidth on the storage node side. Ideally, there would be a HEAD request supported by the storage node for this purpose.

  1. As soon as any storage provider endpoint confirms the availability of the object, the availabilityCheckQueue is temporarly stopped and a request is made to fetch the full data from the selected provider. If multiple storage providers confirm the availability of the object at roughly the same time, the fetch / download requests will be added to a queue (which uses a concurrency of 1), allowing the distributor node to instantly try a different provider in case the actual data fetch request to a given provider fails. The process continues until a storage node that succesfully responds to this request is found.

  2. Once the storage node succesfully responds with the data, all other requests w.r.t. that content are stopped and the node begins to write the data into its filesystem. Any errors at this point (unexpected data size, stream errors) will mean a failure to fetch the data object, causing the content to not be stored and the whole process of fetching the object to potentially be repeated later.

TODO: Currently no retry attempts are actually happening during such failure. The object is dropped entirely and the node will only try to fetch it again if there is another request for it. This seems to be an issue and could also potentially be expolited by a malicious/faulty storage node, so some adjustments may be needed (possibly including keeping some state about faulty storage nodes in order to quickly bypass them).

Fetching data from other distributor nodes (?)

It is not clear whether distributor-to-distributor communication should also be utilized for the purpose of fetching missing data objects. It certainly adds a lot of complexity, which we probably want to avoid in the first iteration of Storage v2.

The state

Most of the state, including both the part that's only stored in node's memory and doesn't need to be persisted across restarts and the part that is persisted in the storage (currently via an asynchronously updated json file), is handled via an "intermediary" StateCacheService. This is to faciliate the potential migration to other state management approaches, like using a Redis database etc.

The current node state includes:

Memory

Memory + persistent storage

Content and caching policy

The current caching policy is a form of TLRU (https://en.wikipedia.org/wiki/Cache_replacement_policies#Time_aware_least_recently_used_(TLRU)) where the TTU component is a function of data object size, such that the small data objects have higher TTU, while big data objects have lower TTU, making the smaller data objects more favorable to store over longer periods of time.

TODO: The exact shape of the function of how object size should affect TTU is not exactly clear yet.

Storage limit

The node operator can configure the storage limit - the amount of storage that will be devoted to the cache. The node will always try to maximally utilize the storage up to the provided limit, which means no data objects are currently removed just because they are stale or no longer supported. The data objects are only removed to make place for the new data objects, in case there is currently no free space available (accoriding to the limit). This also means it is not possible for srored objects size to go beyond this limit.

Further developments:

It would make sense to make some adjustments, so that the stale data objects (past their lifetime) and objects that are no longer supposed to be served by the node are dropped as soon as possible. This would make the cache more flexible and reduce the average amount of storage beeing used (in cases the limit is relatively high). The amount of storage used in that case would also become a good indicator of how much storage the distributor node really "needs" depending on user activity. A hybrid approach where a storage limit is possible to be set, but not necessary for the cache to function properly should be implemented.

Startup, cleanup, data integrity

Startup and cleanup functions are an important piece of the distributor node design.

During a startup, some data integrity checks are performed, which potentially may later be supported also while the node is still running. An additional, more thorough check, including calculataion of data object hashes, re-reading files mimeType metadata etc. may also be added in the future.

Current startup / cleanup logic includes:

Startup:

Cleanup:

There are multiple ways the node can be shut down, obviously not all of them allow performing any cleanup at all (ie. power outage, forced process kill etc.). Currently the node is trying to make the most graceful exit possible given the circumstances.

The node will always try to make sure all the stored data, like logs and StateCacheService data, is flushed to the disk before exiting.

Idea: If the node process is terminated in a "non-instant" way (for example, through CTRL+C), it may also try to wait until all the current requests are served and all the data objects beeing fetched from the storage nodes are fully fetched and stored in the local fs. This shouldn't take more than 30 second though, because after that time the process may be forceully killed.

Logging

The distributor node, just like the storage node, is using winston library for logging.

This is a very popular NodeJS logging library, it's also easy to integrate it with the Elastic stack (https://www.elastic.co/guide/en/ecs-logging/nodejs/current/winston.html) and an ExpressJS server. It allows specifying multiple targets for the logs to be directed to (currenty the distributor node logs are stored in a file and outputted to a console), supports multiple log levels (see: https://www.npmjs.com/package/winston#logging-levels), allows managing multiple log formats etc.

I order to test the Elastic Stack integration, I created a docker-compose file for running the stack locally along with the distributor node itself. The flow of the logs in that case is following (which is just one of many possible paths): Distributor node (winston) -> logfile -> FileBeat -> ElasticSearch -> Kibana

Metadata standards

The current metadata standards are described by following a set of protobuf messages:

message GeoCoordiantes {
  required float latitude = 3;
  required float longitude = 4;
}

message NodeLocationMetadata {
  optional string country_code = 1; // ISO 3166-1 alpha-2 country code (2 letters)
  optional string city = 2; // City name
  optional GeoCoordiantes coordinates = 3; // Geographic coordinates
}

message StorageBucketOperatorMetadata {
  optional string endpoint = 1; // Root storage node endpoint (ie. https://example.com/storage)
  optional NodeLocationMetadata location = 2; // Information about node's phisical location
  optional string extra = 3; // Additional information about the node / node operator
}

message DistributionBucketOperatorMetadata {
  optional string endpoint = 1; // Root distribution node endpoint (ie. https://example.com/distribution)
  optional NodeLocationMetadata location = 2; // Information about node's phisical location
  optional string extra = 3; // Additional information about the node / node operator
}

message DistributionBucketFamilyMetadata {
  optional string region = 1; // ID / name of the region covered by the distribution family (ie. us-east-1). Should be unique.
  optional string description = 2; // Additional, more specific description of the region
  repeated GeoCoordiantes boundary = 3; // Geographical boundary of the region, defined as polygon through array of coordinates
}

In practice, only a few of those metadata fields may end up actually beeing used, but my reasoning was that allowing the operators to provide some optional information about the node may have some utility both in terms of debugging and optimizing some processes related to grouping sorting nodes etc., for example, based on location.

The most critical part of the metadata are node endpoints - in that regard, the metadata is the only viable source of infromation about how to access given a node.

Distribution bucket families

It is important that the frontend applications (ie. Atlas) can easily identify which distribution bucket families should be preffered for fetching the assets, based on user's location and connection.

There are multiple ways of how this can be achieved and the current metadata standard is quite flexible in that regard, not enforcing any specific semantics on, for example, what is considered a region.

There are, however, a few ideas I had in mind while designing the standard:

Using client's geolocation and geographic regions:

Using latency test:

The forentend application can perform a simple latency test, pinging some endpoints that are known to be associated with a given region (for example, see: https://www.cloudping.info/) in order to determine which regions will be the best fit for a client.

I was considering whether the family bucket metadata should include a field like pingTargets that would allow specifying those endpoints, but decided to drop it to reduce the complexity. After some reconsideration I think it may be actually be a good idea to include it.

Query node

Mappings

There is currently a pending PR with the query-node storage v2 schema and mappings: https://github.com/Joystream/joystream/pull/2515

Missing parts:

Unresolved issues:

Integration with distributor node

For maximum efficiency, the processor part of the query node (ideally containing only the storage v2 related mappings) should probably be run either on the same machine as the distributor node itself, or on a machine that's almost equally quickly accessible.

The access to the data can be even further optimized if the distributor node connects to the processor database directly, instead of using the graphql-server interface. This needs to be further evaluated in terms of pros and cons.

bedeho commented 3 years ago

Future proofing?

I wanted to make one observation here. In the longer run, its almost certainly going to be the case that the distributor needs to understand what different data objects represents, in terms of content or avatars, etc.

The most important reason will be that when we want to enforce access policies, we will need to understand something about who is trying to access what, and in this case we may want to allow anyone to for example fetch a preview image, but not the main video media, etc. Different users may also need to prove they have done certain things on.chain, like paying or holding certain tokens, to access certain kinds of content.

Another reason could be that we may need to have the caching policy be sensitive to something even deeper than size, in order to be really effective at scale.

Does any of this make you think we should change anything in the current design for Giza to be ready for this?

Caching Policy

It's not clear to me that TLRU is the right policy to achieve our goals. It seems designed on managing objects with a given lifetime, afterwhich they should be out. It also does not appear to really differentiate content based on how frequently it is used over some period of time. I must say I actually dont think I really understand how our version of TLRU works, based on what Wikipedia is saying.

As I understand them, our goal is the have a policy where both the size (s, e.g in bytes, so > 1) and the recent access frequency (h, e.g. last 24h) of a data object should be used to decide whether it should be kept in the cache. So conceptually there must be some way to rank all pairs (s,h) which allows the node to manage what objects to keep and which to drop. One way to represent this is just a function weight(s,h) which computes a single value, and then use all these values to rank objects at any given time. The precise way s,h are combined to impact w reflects how each property is prioritised. For example w(s,h) = h/s would give a lot of weight to frequently accessed content or small content, and w(s,h) = h/(sˆ2) will be more sensitive to content being bigger, kicking it out earlier.

One thing that does occur to me as being problematic for this policy, and the TLRU policy , and any policy that really uses recent traffic to manage the cache, is having to update update the weight of all objects continously, and also having to keep resorting all objects in the cache, and possibly even outside the cache, based on their current weight.

Lezek123 commented 3 years ago

I wanted to make one observation here. In the longer run, its almost certainly going to be the case that the distributor needs to understand what different data objects represents, in terms of content or avatars, etc. The most important reason will be that when we want to enforce access policies, we will need to understand something about who is trying to access what, and in this case we may want to allow anyone to for example fetch a preview image, but not the main video media, etc. Different users may also need to prove they have done certain things on.chain, like paying or holding certain tokens, to access certain kinds of content.

As to access policies, it all depends on the specific use-cases, once we merge master to storage-v2 branch and merge storage-v2 mappings with content-directory mappings, we can have, for example, an objectType field in query-node DataObjects, so that the distributor node can become be aware of it.

As I understand, the runtime will not be aware of data object types? (in storage-v1 there was a concept of objectType in the runtime, I didn't see any such thing in storage-v2) Currently this information lives solely in the protobuf metadata.

Having such policy would of course also imply some kind of authentication, but I think this wasn't planned as part of the first iteration of Storage v2?

Another related idea, as mentioned in Query node -> Integration with distributor node section above, is that I think distributor node operators should probably run the query node processor locally with just a limited, "lightweight" set of mappings in order to make the data accessible quickly (perhaps even query the db directly instead of using graphql server), as the current design assumes that this communication is very fast. Obviously the more subsystems the mappings will need include (for example, in order to enforce given policy), the slower and more expensive in terms of storage etc. the processor database will become, so this may be something worth considering.

Another reason could be that we may need to have the caching policy be sensitive to something even deeper than size, in order to be really effective at scale.

This may be partially addressed by the choice of different caching policy (LRU-SP), which I'll describe in the next comment. If we would want to have different caching policies based on object type or separate segments of cache dedicated for different kinds of objects, this would also be possible to implement as long as the query-node that the distributor node connects to exposes objectType information (as described above)

It's not clear to me that TLRU is the right policy to achieve our goals.

I reached the same conclusion and decided to switch to a different policy - LRU-SP, specicially designed for web cache. I'll describe this change in the next comment.

Lezek123 commented 3 years ago

Week 2

Switch to LRU-SP cache policy

Because TLRU cache policy was not the most optimal choice, I decided to switch the distributor node caching policy to LRU-SP. This caching policy was designed specifically for the web and takes into account 3 properties of an object:

The cost function of a cache item is t * s / p, objects with highest cost are first to be evicted.

The cache is divided into LRU sets (groups) such that all objects in a given group share the same integer value of log2(s / p). I decided to use KB as unit for s (object size), this means that with 24 groups, assuming p = 1 for all objects, first group will contain objects of size 1 - 2 KB, second one 2 - 4 KB etc. up until 24-th group with objects of size 8 - 16 GB (or 2^23 KB - 2^24 KB).

Updating LRU position of an item is a very quick operation, basically a cost of incrementing p + checking log2(s / p) + Set.delete() + Set.add() (potentially moving the item to a new set based on current log2(s / p).

Finding an evict candidate is done by taking the "bottom" item from each LRU segment (set) and then choosing an element with lowest t * s / p (which is also relatively quick, considering there are only ~24 groups)

Storage node downloads concurrency limit

There was another limit introduced w.r.t. how many requests related to storage node data object fetching may be performed at a given time, however this limit currently only applies to objects that are in LookingForSource state (the phase in which the distributor node is trying to find a source storage node to download the data object from).

Ultimately there should probably separate limits for:

Some other optimalizations may also be implemented if necessary (ie. some way of batching "availability check" requests during spikes of cache misses to see if a storage node has multiple objects of interest in a single check; keeping some state about which storage node has which objects etc.)

Graceful exit - trying finishing pending downloads

The node will now try to postpone the exit (initialized, for example, via CTRL+C) until all objects that are currently beeing downloaded from storage nodes finish downloading and are saved in the local cache.

NOT IMPLEMENTED - resuming pending downloads on startup

Introducing the ability for the node to resume pending downloads on startup turned out to be more complicated than anticipated. Some information (like data object LRU data) is currently not persisted until the object is fully downloaded, which means there would be a need for some non-trivial changes in the way the state is beeing managed in order to implement it correctly.

It's also not clear how much utility this functionality really provides, because:

More logs and local testing

I performed some additional tests and added more logs in order to be able to easily debug any issues. I executed a test locally where 20 data objects of different sizes were beeing requested from the distributor node at the same time, causing the cache to reach the limit multiple times and trigger the eviction. After some initial issues had been fixed, the test was executed succesfully.

bedeho commented 3 years ago

Just replying to some of your questons above https://github.com/Joystream/joystream/issues/2553#issuecomment-895278715

objectType field in query-node DataObjects, so that the distributor node can become be aware of it.

That sounds like a very interesting and practical approach, will keep it in mind.

Currently this information lives solely in the protobuf metadata.

Correct.

I think distributor node operators should probably run the query node processor locally

That is very interesting, I think we should def. consider that, but perhaps when we have a clearer idea of how much it possibly could reduce latency compared a range of other remedies.

Having such policy would of course also imply some kind of authentication, but I think this wasn't planned as part of the first iteration of Storage v2?

Correct.

bedeho commented 3 years ago

Switch to LRU-SP cache policy

Mind blown, awesome job identifying and implementing this 🚀

bedeho commented 3 years ago

resuming pending downloads on startup

I agree this does not sound like its worth the effort & complexity at this stage.

Lezek123 commented 3 years ago

Added this week

Some CDN research highlights

Most CDNs seems to use either some kind of GeoDNS or anycast for the purpose of connecting the client to the nearest server. The anycast solution is especially interesting, as it allows changing the availability status of a node very quickly (as opposed to DNS, since in that case records may get cached for longer periods of time) and in general manage the traffic more dynamically based on network capabilities.

Other major benefit of those solutions is increased security (mitigation of DDOS impact).

In both cases those solutions are also very convinient for client applications, as they can use just a single domain and the connection will always be established with the most appropriate node based on client's location and current network status.

Both of those solutions assume that:

Implementing any of those would require some changes in the current distribution module logic:

Example implementations

DNS:

Anycast

This seems to be a great solution in case the servers are owned by a single entity, but in a DAO, where both the working group lead and workers can change dynamically and where we'd want to limit any points of centralization, make it easy to spin up a new node etc. - anycast seems very hard to achieve.

The reason for that is that in order to participate in the routing protocol, one needs to own a /24 IP subnet and ASN. Alternatively a service like AWS Global Accelerator can be used for Anycast, but it's not clear how this could be managed in a decentralized manner either.

To be continued...

Some resources (mostly anycast related): https://www.youtube.com/watch?v=40MONHHF2BU&ab_channel=TeamNANOG https://archive.nanog.org/meetings/nanog29/presentations/miller.pdf https://labs.ripe.net/author/samir_jafferali/build-your-own-anycast-network-in-nine-steps/ https://www.nginx.com/blog/learn-to-stop-worrying-build-cdn https://blog.apnic.net/2021/04/07/building-an-open-source-anycast-cdn/ https://aws.amazon.com/blogs/aws/new-aws-global-accelerator-for-availability-and-performance/

bedeho commented 3 years ago

Most CDNs seems to use either some kind of GeoDNS or anycast for the purpose of connecting the client to the nearest server. The anycast solution is especially interesting, as it allows changing the availability status of a node very quickly (as opposed to DNS, since in that case records may get cached for longer periods of time) and in general manage the traffic more dynamically based on network capabilities. Other major benefit of those solutions is increased security (mitigation of DDOS impact).

Could you perhaps more explicitly how these solutions allow

in a way which is better than we currently can? I think this is the key distinction we are interested when looking at other CDNs, not just mimicking.