apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.12k stars 3.56k forks source link

PIP-220: TransferShedder (Only for PIP-192 New Broker Load Manager ) #18215

Closed heesung-sn closed 1 year ago

heesung-sn commented 1 year ago

Motivation

  1. As PIP-192 enables the bundle transfer protocol, we could implement a new shedding strategy to specify a new owner broker.
  2. Improve the edge cases in the current shedders:

Goal

  1. Create a new shedding algo that transfers bundles to a specific broker by the bundle transfer protocol introduced in https://github.com/apache/pulsar/issues/16691
  2. Improve the shedding algo in the following areas
    • add bundle msg throughput signal when computing broker resource usage
    • more aggressive unloading to the new broker
    • minimize the required configs to tune.
    • improve the accuracy of broker load data normalization(clean lingering load data after transfers)
    • optimize the number of bundle unloading for balancing load in the cluster.
    • clarify the global load-balance optimization target(clarify the epoch error function)
  3. This algo will be only used in the new broker load balancer introduced in PIP-192

API Changes

No.

Implementation

Pseudo code

The idea is straightforward. We want to keep unloading bundles from max loaded broker to min loaded broker until the standard deviation of the broker load distribution is below our target.

The following is the Pseudo code.

// compute load data for each broker
for( broker_load_data in active_brokers) {

  // we don't want to use the outdated load data before the last transfer
  // , and we should give enough time for each broker to recompute its load after transfers
  if(broker_load_data.timestamp - last_transfer_timestamp < x secs){
    continue;
  }

   // max(cpu, memory, dic_memory, network_in, network_out, msg_throughput_in, msg_throughput_out)
  cur_load = compute_load(brokerLoadData)
  load = normalize(cur_load)
  load_map.put(broker, load)
  top_k_min_load_brokers.add(broker, load);
  top_k_max_load_brokers.add(broker, load);
}

// compute std
std = standard_deviation(load_map, offload_map)

// force-unload if min_broker is a new broker
for(int i =0; i < max_transfer_cnt && (std > std_threshold || top_k_min_load_broker.peek().msg_throughput  == 0 ); i++){
    (dst_broker, dst_load) = top_k_min_load_brokers.pop()
    (src_broker, src_load) = top_k_max_load_brokers.pop()
    if(dst_broker== null|| src_broker==null || dst_broker  == src_broker ) return;

    // we could adjust this offload_percent by other threshold configs
    offload_percent = (src_load - dst_load) / 2 
    offload_throughput = offload_percent * src_broker.throughput

    // Transfer bundles, from highest loaded to lowest,  from src_broker to dst_broker til sum(bundle.throughput) < offload_throughput
      ... 

     // mark offload_throughput 
     offload_map.put(dst_broker, -offload_percent)
     offload_map.set(src_broker, offload_percent)
     transferred_brokers.add(dst_broker)
     transferred_brokers.add(src_broker)

     // recompute std by considering the offload_throughput 
     std = standard_deviation(load_map, offload_map)
}
// clean load caches. 
// we need to track new load data to avoid repeated transfers.
offload_map.clear()
for (broker : transferred_brokers) {
      load_map.remove(broker)
}
// mark the timestamp at the end of the transfer.
last_transfer_timestamp = now()

normalize(cur_load){

  // this is an exponential moving window version
  // we could make this normalization configurable for other configurable methods
   return historical_load_weight * prev_load + ( 1 - historical_load_weight) * cur_load
}

standard_deviation(load_map, offload_map){
    // for each broker recompute load considering offload_map
    return std(load = load_map.get(broker) - offload_map.get(broker));
}

Theoretical Load Balance Epochs(Transfer Counts) per Cluster Size for Target std=15

This data shows how many epochs are required to tune the max_transfer_cnt config.

In general, the number of required transfer cycles(minutes) for (target std=15) = epochs / max_transfer_cnt

Screen Shot 2022-10-26 at 6 54 13 PM
Cluster Size(number of brokers) Epochs
10 3
20 5
40 8
80 15
100 19
200 36
300 54
400 72
500 89
600 107
700 125
800 143
900 160
1000 178

Alternatives

N/A

Anything else?

No response

codelipenghui commented 1 year ago

I think the part of More aggressive load balance strategy mentioned in this proposal is resolved by https://github.com/apache/pulsar/pull/17456

For the load balance epochs. I'm not if I understand it correctly. Is it will expose metrics or something to help to tune the max_transfer_cnt?

heesung-sn commented 1 year ago

Hi,

Regarding More aggressive load balance strategy, I think the idea here is similar from #17456 https://github.com/apache/pulsar/pull/17456. Even if no global shedding condition is met (here the shedding condition is std-based instead of avg threshold), and if the min resource usage broker is having no traffic, this pip is still trying to transfer load from max to min broker.

Yes. This pip proposes standard deviation to model the load balance optimization problem. There will be metrics(e.g. bundle_transfer_epoch, bundle_transfer_count, broker_load_std, broker_load_avg) to show how many bundle_transfer_count(epochs) are taken to reach the target(target_std). (bundle_transfer_epoch is reset to zero once the running std reaches target_std). In this way, we can clearly monitor the time(epochs) and bundle_transfer_count to reach the target load distribution. Accordingly, one could tune the load balance configs like max_transfer_cnt you mentioned.

Regards, Heesung

On Sun, Oct 30, 2022 at 7:02 PM Penghui Li @.***> wrote:

I think the part of More aggressive load balance strategy mentioned in this proposal is resolved by #17456 https://github.com/apache/pulsar/pull/17456

For the load balance epochs. I'm not if I understand it correctly. Is it will expose metrics or something to help to tune the max_transfer_cnt?

— Reply to this email directly, view it on GitHub https://github.com/apache/pulsar/issues/18215#issuecomment-1296442299, or unsubscribe https://github.com/notifications/unsubscribe-auth/AYVJ675NMQQHDTTAVPLRUXTWF4SDBANCNFSM6AAAAAARPSHN6Q . You are receiving this because you authored the thread.Message ID: @.***>

heesung-sn commented 1 year ago

Hi,

I will raise a vote soon if there are no more questions or comments.

Thanks, Heesung

On Mon, Oct 31, 2022 at 10:19 AM Heesung Sohn @.***> wrote:

Hi,

Regarding More aggressive load balance strategy, I think the idea here is similar from #17456 https://github.com/apache/pulsar/pull/17456. Even if no global shedding condition is met (here the shedding condition is std-based instead of avg threshold), and if the min resource usage broker is having no traffic, this pip is still trying to transfer load from max to min broker.

Yes. This pip proposes standard deviation to model the load balance optimization problem. There will be metrics(e.g. bundle_transfer_epoch, bundle_transfer_count, broker_load_std, broker_load_avg) to show how many bundle_transfer_count(epochs) are taken to reach the target(target_std). (bundle_transfer_epoch is reset to zero once the running std reaches target_std). In this way, we can clearly monitor the time(epochs) and bundle_transfer_count to reach the target load distribution. Accordingly, one could tune the load balance configs like max_transfer_cnt you mentioned.

Regards, Heesung

On Sun, Oct 30, 2022 at 7:02 PM Penghui Li @.***> wrote:

I think the part of More aggressive load balance strategy mentioned in this proposal is resolved by #17456 https://github.com/apache/pulsar/pull/17456

For the load balance epochs. I'm not if I understand it correctly. Is it will expose metrics or something to help to tune the max_transfer_cnt?

— Reply to this email directly, view it on GitHub https://github.com/apache/pulsar/issues/18215#issuecomment-1296442299, or unsubscribe https://github.com/notifications/unsubscribe-auth/AYVJ675NMQQHDTTAVPLRUXTWF4SDBANCNFSM6AAAAAARPSHN6Q . You are receiving this because you authored the thread.Message ID: @.***>

heesung-sn commented 1 year ago

discussion email thread: https://lists.apache.org/thread/9k4968h57ffc6q2g6zn1tnbz5ql234x7 vote email thread: https://lists.apache.org/thread/cwnyjcy8vw9mb08jwohn9l1d3g62wtd1

heesung-sn commented 1 year ago

Raised a PR : https://github.com/apache/pulsar/pull/18865

heesung-sn commented 1 year ago

Note: we will have separate PIP and PRs to define the new broker load balancer metrics centrally.

github-actions[bot] commented 1 year ago

The issue had no activity for 30 days, mark with Stale label.