apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.23k stars 3.58k forks source link

PIP-211: Introduce offload throttling #18004

Open tjiuming opened 2 years ago

tjiuming commented 2 years ago

Motivation

Pulsar doesn't have offload throttling for now. With offload, pulsar could move backlog data from bookkeeper to some other long term storage which is cheaper(AWS S3, Aliyun OSS, GCS etc).

The offload task can have a chance to use up all the broker resources(CPU, network).

When offload tasks use too much broker resources, the latency of messaging may increase and it makes Pulsar broker unstable.

For the purpose of managing broker resources and improving broker stability, we would like to implement “Offload throttling” in the Pulsar brokers. This mechanism will allow the enforcement of an upper limit to the rate of Offload.

Goal

  1. Provide broker level offload limiting configurations.
  2. Provide a relatively accurate current limiting algorithm, to protect the broker.

API Changes

1. ServiceConfiguration

a. Long managedLedgerOffloadBrokerFlowPermits: The broker level flow permit per second

2. ManagedLedgerConfig

a. setManagedLedgerOffloadBrokerFlowPermits(long brokerFlowPermits);
b. Long getManagedLedgerOffloadBrokerFlowPermits();

Implementation

An offload task often started in ManagedLedgerImpl#offloadLoop(Args …) by calling:

            prepareLedgerInfoForOffloaded(ledgerId, uuid, driverName, driverMetadata)
                .thenCompose((ignore) -> getLedgerHandle(ledgerId))
        // start to offload
                .thenCompose(readHandle -> config.getLedgerOffloader().offload(readHandle, uuid, extraMetadata))
                .thenCompose((ignore) -> {
                    // ignore
                })
                .whenComplete((ignore, exception) -> {
                    // ignore
                });

We can add a step here to let the Offloader use an decorated ReadHandle, in the decorated ReadHandle, it includes the limiting algorithm for read entries, and for the decorated ReadHandle, I named it OffloadReadHandle.

            prepareLedgerInfoForOffloaded(ledgerId, uuid, driverName, driverMetadata)
                .thenCompose((ignore) -> getLedgerHandle(ledgerId))
                // decorate the ReadHandle to OffloadReadHandle
                .thenCompose(readHandle -> OffloadReadHandle.create(readHandle, flowPermits, scheduler))
                .thenCompose(readHandle -> config.getLedgerOffloader().offload(readHandle, uuid, extraMetadata))
                .thenCompose((ignore) -> {
                    // ignore
                })
                .whenComplete((ignore, exception) -> {
                    // ignore
                });

1. OffloadReadHandle

In the FileSystemManagedLedgerOffload and BlobStoreManagedLedgerOffloader, read entries by calling ReadHandle.readAsync(startEntry, endEntry).

For the purpose of implementing throttling, we can decorate ReadHandle and rewrite the readAsync(startEntry, endEntry) method. Before reading entries from the ledger, we need to check if the Ledger is restricted, submit a read entries task to a scheduler with delay milliseconds, if not, read entries from ReadHandle. After reading entries successfully, record the bytes of entries.

This method cannot strictly limit the read traffic, because we don’t know the size of entries before reading, but it provides a relatively accurate limit.

public class OffloadReadHandle implements ReadHandle {
    // static fields, ignore
    .....

    // ignore some fields
    .....

    private final ReadHandle delegator;
    private final OrderedScheduler scheduler;

    public OffloadReadHandle(ReadHandle handle, String ledgerName, ManagedLedgerConfig config,
                              OrderedScheduler scheduler) {
        // init fields, ignore
        .....
    }

    @Override
    public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
        final long delayMills = calculateDelayMillis();
        if (delayMills > 0) {
            CompletableFuture<LedgerEntries> f = new CompletableFuture<>();
            Runnable cmd = new ReadAsyncCommand(firstEntry, lastEntry, f);
            scheduler.schedule(cmd, delayMills, TimeUnit.MILLISECONDS);
            return f;
        }

        return this.delegator
                .readAsync(firstEntry, lastEntry)
                .whenComplete((v, t) -> {
                    if (t == null) {
                        recordReadBytes(v);
                    }
                });
    }

    //ignore unimportant methods
    .....

    // calculate the millis need to delay
    private long calculateDelayMillis() {
          // ignore
          ....
          return 0;
    }

    // record entries size after reading
    private void recordReadBytes(LedgerEntries entries) {
         // ignore
         .....
    }

    private final class ReadAsyncCommand implements Runnable {

        private final long firstEntry;
        private final long lastEntry;
        private final CompletableFuture<LedgerEntries> f;

        ReadAsyncCommand(long firstEntry, long lastEntry, CompletableFuture<LedgerEntries> f) {
            // init fields
            ....
        }

        @Override
        public void run() {
            // if it still needs to wait, submit the next task.
            long delayMillis = calculateDelayMillis();
            if (delayMillis > 0) {
                scheduler.schedule(this, delayMillis, TimeUnit.MILLISECONDS);
                return;
            }

            delegator.readAsync(firstEntry, lastEntry)
                    .whenComplete((entries, e) -> {
                        if (e != null) {
                            f.completeExceptionally(e);
                        } else {
                            f.complete(entries);
                            recordReadBytes(entries);
                        }
                    });
        }
    }
}

This class is based on a SlidingWindow, if the ledger is restricted, will submit the read task to the next window.

2. Metrics

a. Existing metrics
Name Type Description
brk_ledgeroffloader_offload_error Counter The number of failed operations to offload.
brk_ledgeroffloader_offload_rate Gauge The rate of offloading(byte per second).
brk_ledgeroffloader_read_offload_error Counter The number of failed operations to read offload ledgers.
brk_ledgeroffloader_read_offload_rate Gauge The rate of reading entries from offload ledgers(byte per second).
brk_ledgeroffloader_write_storage_error Counter The number of failed operations to write to storage.
brk_ledgeroffloader_read_offload_index_latency Summary The latency of reading index from offload ledgers.
brk_ledgeroffloader_read_offload_data_latency Summary The latency of reading data from offload ledgers.
brk_ledgeroffloader_read_ledger_latency Summary The latency of reading entries from BookKeeper.
brk_ledgeroffloader_delete_offload_ops Counter The total number of successful and failed operations to delete offload ledgers.
b. New Metrics
Name Type Description
brk_ledgeroffloader_ledger_offload_limited Counter The number of times the ledger reading is restricted.
brk_ledgeroffloader_ledger_offload_time Counter The total time to read all the entries from the ledger.
brk_ledgeroffloader_ledger_reading_max_bytes Gauge The max number of bytes reading entries in the window.

Alternatives

Anything else?

Google doc link: https://docs.google.com/document/d/1HiVyhFpkifODUXEpZF0UYPJGeXtQpI9tbeie_IkDXqk/edit#

tjiuming commented 1 year ago

Discuss thread: https://lists.apache.org/thread/3j8ldw93gyx1kblknygq35nq8g72plpx

tjiuming commented 1 year ago

Vote thread: https://lists.apache.org/thread/pgfq9khqb6m85661c3sls5w5f9cmqn2c

github-actions[bot] commented 1 year ago

The issue had no activity for 30 days, mark with Stale label.