livepeer / go-livepeer

Official Go implementation of the Livepeer protocol
http://livepeer.org
MIT License
546 stars 171 forks source link

Optimize per stream orchestrator discovery process #2329

Open leszko opened 2 years ago

leszko commented 2 years ago

We execute GetOrchestrators() during each session refresh which happens:

Refreshing a session means executing n of Os parallel HTTP calls for each stream. The means that with the growing number of active orchestrator, we'll send more and more HTTP requests.

We need to analyze the impact of the n of stream and n of Os on the CPU usage. Then, consider implementing an optimization to the process. Some initial ideas:

I suggest to first research the domain of solutions, describe and review them, and when we agree on the solution, finally do the actual implementation.

leszko commented 2 years ago

TL;DR

To address the explosion of the number of concurrent HTTP requests during the discovery process I suggest to implement 2 improvements:

I believe we can also use the approach of caching and batching to solve the issues described in https://github.com/livepeer/go-livepeer/issues/2330.

Here's the PoC PR which incudes the working version of orchestrator info caching (does not include the batching).

Current discovery process

sequenceDiagram
  autonumber

    actor U as User
    participant B as Broadcaster
    participant O1 as Orchestrator 1
    participant O2 as Orchestrator 2
    participant On as Orchestrator n

    U->>B: Start stream

    B-->>B: Refresh session
    par Discovery (max. 0.5-6s)
        B->>O1: HTTP Request: Get orchestrator info
        B->>O2: HTTP Request: Get orchestrator info
        B->>On: HTTP Request: Get orchestrator info
    end
    B-->>B: Create orchestrator pool

    loop Video transcoding with selected Os
        B->>O2: Send and receive segments
    end

    Note over B,On: When not enough orchestrators in the pool, then refresh the session again

1. Stream is started, which initializes the session refresh 2. B performs the session refresh "per stream", so if multiple streams are started in parallel, then multiple session refreshes happen at the same time 3-5. The session refresh means making an HTTP request (Get Orchestrator Info) to each active O

6. When B fetched all Orchestrator Infos, then it creates an orchestrator pool. 7. From the pool, B takes one O and uses it for the video transcoding

Issues with the current discovery process

  1. When we increase the number of active Os, then the number of parallel HTTP requests increases as well. At some point, it may impact the CPU usage and congest the network.
  2. When multiple streams start at the same time, the number of concurrent HTTP requests increases, which may have an unpredictable impact on the CPU usage and congest the network.

Proposed change

sequenceDiagram
    autonumber

    actor U as User
    participant B as Broadcaster
    participant O1 as Orchestrator 1
    participant O2 as Orchestrator 2
    participant O101 as Orchestrator 101
    participant On as Orchestrator n

    loop Every 15 min
        par Discovery 1-100
            B->>O1: HTTP Request: Get orchestrator info
            B->>O2: HTTP Request: Get orchestrator info
        end
        par Discovery 101-200, 201-300, ...
            B->>O101: HTTP Request: Get orchestrator info
            B->>On: HTTP Request: Get orchestrator info
        end
        Note over B,On: Repeat sequentially 100 parallel HTTP calls
    end

  U->>B: Start stream

    B-->>B: Refresh session
    loop Video transcoding with selected Os
        B->>O2: Send and receive segments
    end

    Note over B,On: When not enough orchestrators in the pool, then refresh the session again

1-2. Fetching Orchestrator Infos is executed periodically (every 15 min) and the Orchestrator Info data is cached. 3-4. To avoid the growing number of parallel HTTP requests, B executes only 100 requests in parallel

5. Stream is started, which initializes the session refresh 6. The session refresh means getting all Orchestrator Infos from the cache and selecting the orchestrators into the pool. 7. From the pool, B takes one O and uses it for video transcoding

Important aspects

There are a few things we need to keep in mind with while implementing the described change.

  1. Orchestrator info data becomes less "fresh"
    • Ticket parameters may have expired data
      • B can't validate them while creating the pool
      • I think it's not a big issue, because we already have a similar situation when B fails over to another O in the pool
      • B will re-fetch orchestrator Info (and ticket parameters) while starting to work with the given O
    • price info might have changed in the meantime
      • I think it's minor, because Os rarely change the price info
      • We need to double check that this value is validated while starting to work with the given O
  2. Orchestrator info data becomes fetched globally (not "per stream")
    • Fetching orchestrator infos does not take into account the capabilities (if it's compatible with the stream), this check must be done during the session refresh
    • I think that caching orchestrator info globally is a good thing, because the session refresh from multiple stream can reuse the same data
  3. We fetch orchestrator info even if it's not needed
    • If there are no streams, we still execute HTTP requests which may be redundant
    • I think it's minor, but we can always consider some having some start/stop mechanism and stop refreshing orchestrator infos if there are no stream for a long time

Alternative solutions & things to consider

There are a few additional things we can consider to implement, though I don't think we need them right now.

  1. Store orchestrator info cache in DB
  2. Global O suspension mechanism
    • Currently I suggest to keep the suspension mechanism local to the streams
    • We can consider making it global
  3. Make the discovery parameters configurable:
    • Discovery interval
    • Discovery parallel batch size
  4. Use HTTP connection pool instead of batching parallel HTTP requests
    • An alternative solution to sequentially executing the batches of 100 parallel HTTP requests is to use the HTTP connection pooling
    • HTTP connection pooling means that we keep a (global) limit on the number of parallel HTTP requests
    • I suggest to use batching because it's simpler to implement and has less implications on the global architecture
leszko commented 2 years ago

@yondonfu and @red-0ne Please take a look and review it.

red-0ne commented 2 years ago

I think caching Os info is the way to go, but doing it for a big amount of Os may bring its own set of issues. Using a time interval to fetch the info may do the job for most use cases but still seem a bit brutal. The good thing about it is that it brings minimal change to the architecture.

Are Os updating some of their info without announcing it onchain? What are these infos? If we manage to make O announce changes onchain and since changes are not that frequent then blockchain events can be used to trigger info fetching.

leszko commented 2 years ago

Are Os updating some of their info without announcing it onchain? What are these infos? If we manage to make O announce changes onchain and since changes are not that frequent then blockchain events can be used to trigger info fetching.

The Orch Info is not announces on-chain (at least not all the information). Orch Info is fetched P2P, so I think all we can do is to:

The Orch Info struct includes the following fields: https://github.com/livepeer/go-livepeer/blob/fce545fa16b6a2292394fe5134a755cd0f830e35/net/lp_rpc.pb.go#L537

red-0ne commented 2 years ago

I see... But if we make Os announce updates onchain (not necessarily including the new info onchain) then we can have Bs update their cache only when needed. We can use ServiceRegistry.setServiceURI to emit the event (no need to add more data there, just need the event emission... maybe add a hash of the config at most).

This will provide Bs an easy way to collect updated Os, by storing the latest block processed, they can fast forward to the latest block collecting Os. This will not only make Bs call no more than the concerned Os but also have a (quasi) consistent state of all of them beside fast node starts.

I was imagining something along those lines:

func start() {
  lastProcessed := ow.getLastProcessedBlockFromDB()
  ow.subscribeToServiceURIUpdateEvents(lastProcessed, fetchOrchInfo)
}

With

func fetchOrchInfo(blockToProcess int, event ServiceURIUpdateEvent) {
  info := ow.p2pFetchOrchInfo(event.serviceURI) // this is a p2p fetch
  ow.updateInfo(info)
  ow.updateLastProcessedBlock(blockToProcess) 
}

The real world version has to be more sophisticated as it needs to handle B startups with O updating on multiple blocks (lastProcessed = n and O updates at block n+x then n+x+y) and also multiple Os updating on the same block.

If using the blockchain to keep Bs updated is a no no then for sure, contact as much Os as possible is the way to go. Maybe by doing aggressive fetching at startup then spread the load along time.

Does this make sense to you?

leszko commented 2 years ago

I think we can't store all the OrchInfo data on-chain, because:

  1. each on-chain modification has a cost, which results in less frequent changes (for the context, price per pixel used to be an on-chain parameter and we moved it off-chain)
  2. OrchInfo contains some information which needs to be "fresh":
    • Ticket Params
    • Info if the O is available (if it does not return anything, B should not even consider using it)

While we overcome somehow Point 2, I think we don't want to change Point 1. In other words, we want the functionality of changing some O parameters without the need to execute an eth tx.

red-0ne commented 2 years ago

Sorry, I may have misguided you. In the code snippets it was intended to be ow.Method instead of o.Method I will edit it.

each on-chain modification has a cost, which results in less frequent changes (for the context, price per pixel used to be an on-chain parameter and we moved it off-chain)

It is not meant to store more info on chain. Just Os poking Bs through on-chain events. Bs will be p2p fetching the info after receiving the onchain notification.

OrchInfo contains some information which needs to be "fresh":

It will be in the worst case 1 block old + 1 p2p fetch. while with periodic fetching it will be at best the time interval configured. There is also high chances that Bs do not pick the update if there is a lot of Os (thousands?) and B have to cycle fetch over them. Well, it depends on what "fresh" is :smile:

we want the functionality of changing some O parameters without the need to execute an eth tx.

True, in this case O will be sending a tx each time he wants to notify Bs about it's new params.

leszko commented 2 years ago

True, in this case O will be sending a tx each time he wants to notify Bs about it's new params.

Yeah, that's why I think it's not going to work on-chain. We don't want O to send tx just to notify B about the changes.

yondonfu commented 2 years ago

I think caching OrchestratorInfo messages makes sense especially since the data included in each message does not vary per stream. At the moment, when a B receives multiple streams it ends up fetching multiple OrchestratorInfo message responses from the same O. If we cache the messages then B can just re-use a message that it already previously received from O.

The way that I am thinking about this proposal is that O discovery is pushed into a global background process that runs on B at a regular interval instead of running as a per stream process (as it does now). Additionally, O pool construction for a stream will be based on the OrchestratorInfo cache instead of being based on discovery (since discovery is now updating the cache in the background). From a terminology POV should we use the following?

One thing to keep in mind is that the slice returned by discovery.GetOrchestrators will order OrchestratorInfo messages based on how quick Os respond i.e. the fastest O to respond will have its OrchestratorInfo message first in the slice, the second fastest O to respond will have its message second in the slice, etc. As a result, when selection is run, if multiple Os have the same stake or if B is running in off-chain mode, the fastest O to respond during discovery will be selected first. This is a useful property because when running in on-chain mode it allows speed of discovery response to be a tie breaker given two Os with the same stake and when running in off-chain mode it allows Os to be selected in order of discovery response time. In both cases, this helps B prioritize Os that are "closer" in the network. I think we should aim to preserve this property when implementing caching.

B can't validate them while creating the pool

I think we can validate them on O pool creation for a stream?

price info might have changed in the meantime

The price will expire alongside the ticket params so B would need to refresh the OrchestratorInfo for an O with a RPC request at this point.

Fetching orchestrator infos does not take into account the capabilities (if it's compatible with the stream), this check must be done during the session refresh

Since the capability bitstring is in the OrchestratorInfo message seems like we should be able to do the capability check during O pool creation for a stream - the source of the messages with the capability bitstrings would just be the cache instead of based on responses from the network.

We fetch orchestrator info even if it's not needed

I think this is fine given the benefits you outlined and since it cuts out the additional requests that would exist if we continue to send requests for OrchestratorInfo per stream.

batching parallel HTTP requests

Since we only have 100 active Os on the network today and since the proposal suggests using 100 as the batch size do you think we can add batching separately from caching OrchestratorInfo messages?

yondonfu commented 2 years ago

@red-0ne

Interesting suggestion of using on-chain events to notify Bs of when to send a network request to O to get a fresh OrchestratorInfo message! This seems like a pub-sub pattern where the publish step is implemented using an on-chain event and interested Bs can just subscribe to that event.

I like the pattern, but I echo Rafal's concerns of relying on an on-chain tx to trigger the event which increases txs costs for O which is already tx cost sensitive. At the moment, Os dynamically generate the ticket params and price based on the estimated gas price that will be used for ticket redemption. As a result, as gas prices fluctuate, the ticket params and price could fluctuate as well [1] which might lead to more frequent changes to the data in the OrchestratorInfo message which might lead to more frequent on-chain txs if we're publishing an event on-chain every time the contents of the OrchestratorInfo message changes.

An alternative implementation of a pub-sub pattern could be for O to expose a websocket subscription API that publishes new OrchestratorInfo messages. But, this would require persistent websocket connections between Bs and Os.

Perhaps we can consider these pub-sub pattern variants as possible improvements on top of the current caching proposal that can considered separately.

[1] Currently, the ticket params + price remain valid for a period of time after they are advertised so that those values can be used by a B for X period of time even if the O is already advertising new ticket params + price

leszko commented 2 years ago

Thanks for the feedback @yondonfu

From a terminology POV should we use the following?

  • O discovery -> process for fetching and caching OrchestratorInfo messages from known Os
  • O pool creation -> process for creating an O pool to be used for a stream based on the OrchestratorInfo cache

Yes, I think it's good to stick to this terminology.

One thing to keep in mind is that the slice returned by discovery.GetOrchestrators will order OrchestratorInfo messages based on how quick Os respond i.e. the fastest O to respond will have its OrchestratorInfo message first in the slice, the second fastest O to respond will have its message second in the slice, etc. As a result, when selection is run, if multiple Os have the same stake or if B is running in off-chain mode, the fastest O to respond during discovery will be selected first. This is a useful property because when running in on-chain mode it allows speed of discovery response to be a tie breaker given two Os with the same stake and when running in off-chain mode it allows Os to be selected in order of discovery response time. In both cases, this helps B prioritize Os that are "closer" in the network. I think we should aim to preserve this property when implementing caching.

Ok, I think it makes sense to make it work the same.

I think we can validate them on O pool creation for a stream?

Fetching orchestrator infos does not take into account the capabilities (if it's compatible with the stream), this check must be done during the session refresh

Since the capability bitstring is in the OrchestratorInfo message seems like we should be able to do the capability check during O pool creation for a stream - the source of the messages with the capability bitstrings would just be the cache instead of based on responses from the network.

We fetch orchestrator info even if it's not needed

Yes, we can check both capabilities and the predonditions during the pool creation time.

batching parallel HTTP requests

Since we only have 100 active Os on the network today and since the proposal suggests using 100 as the batch size do you think we can add batching separately from caching OrchestratorInfo messages?

I think we can split it into 2 PRs:

But I think we do need batching, because if we don't do it and grow with the number of Os, then we can experience sudden spikes in the CPU usage due to the number of HTTP requests calls. If we stick to 100 Os, then sure we don't need to do batch, but then we also don't need to implement caching, it could work as it is.

Saying that, I plan to first send a PR with the caching and later a separate PR to add batching. Let me know if you think otherwise.