divviup / janus

Experimental implementation of the Distributed Aggregation Protocol (DAP) specification.
Mozilla Public License 2.0
53 stars 14 forks source link

Asynchronous aggregation jobs #3451

Open inahga opened 4 days ago

inahga commented 4 days ago

DAP-12 supports handling aggregation jobs asynchronously. Determine how to implement this, and implement it.

inahga commented 4 days ago

There are a few ways we can support asynchronous aggregation.

Leader changes

In all situations, we need to change the leader such that it supports a helper behaving asynchronously. This requires aggregation job driver changes. There are two approaches possible:

  1. Add polling logic directly after the first aggregate init call, without kicking the job back in the queue. This is the most straightforward approach, and requires no database schema changes. Recovery should still be trivial as long as we generate the same PUT request.

  2. Add logic that kicks the job back in the queue when receiving a polling response. At a glance I think this is a much more invasive change. We need a way to checkpoint the leader-side preparation state to the database, and handle a new state PollingHelper.

    This is complicated but would make more efficient use of the job driver's time, since job drivers won't be tied up by polling the helper. (This can also be trivially remediated by increasing the concurrency of the job driver).

I lean towards approach 1, since it can be straightforwardly accomplished. Depending on the outcome of the second desgn decision below, we may not have a strong use case for a async-aware Leader, so optimization won't be necessary.

The database schema changes look invasive, so we might at least want to at least sketch out approach 2 and commit database changes preemptively, even if only adopting approach 1.

I believe Daphne will be implementing helper-side async, so we should be able to use it for practical interop testing. See https://github.com/cloudflare/daphne/issues/697.

Helper changes

We have options for how we want to implement helper-side async.

  1. Minimally, i.e. handle calls synchronously. This is (should be) allowable under the spec. Only make the necessary HTTP semantic changes.

  2. Support async.

    Implementation is sketched out in https://github.com/divviup/janus/pull/3331. We would need a new ReportAggregations state (n.b. the PR overloads StartLeader as a hack, proper implementation would introduce a new state).

    It's proposed to handle async jobs in the aggregation job driver. This seems like the natural place to put them. It does complicate trivial helper deployments somewhat. The simple solution is to allow the aggregator binary to internally run an instance of the aggregation job driver, like we do with the garbage collector and key rotator.

    Options for supporting one or both of async and sync.

    a. Support both, toggled by feature flag or some arbitrary runtime heuristic (e.g. size of aggregation job). Most flexible, lets us play around in load testing or production to determine which is better.

    b. Support async only. Simpler. Introduces overall less code into Janus, i.e. nominally moves the current aggregation handlers into the job driver.

If adopting 1, we still may want to make the schema changes necessary for 2, in case we want to
adopt 2 later.

3331 is available as a reference of what approach 2 looks like, but I would probably start from a

clean sheet since it's fairly cobbled together.

Open Questions/Conclusions

  1. For the leader, should we go for approach 1 or 2? If we do approach 1, should we still map out the schema changes necessary to support option 2?

  2. For the helper, should we go for approach 1 or 2? If we do approach 2, should we adopt approach 2a, or approach 2b? If we do approach 1, should we still map out the schemea changes necesary to support approach 2?

We may need to speak with stakeholders to determine where they lean as well.

branlwyd commented 4 days ago

Leader

As I see it, the primary operational difference to approach #1 is that we will retransmit the request with each polling attempt, wasting bandwidth. Approach #2 does not have this (but does require more implementation effort). Are there other differences?

If that is the score, I think we should strongly consider implementing #2. Either way, we should definitely sketch out the schema changes required for #2.

Separately, #1 might not save as much implementation effort as we expect: currently the aggregation job driver expects to be able to pick up any unclaimed work item. Without some work to ensure appropriate backoff, we might end up polling the same aggregation job very frequently. (We'd need something similar for #2.)

Helper

We don't need to implement this immediately/at all -- #1 is allowed by the spec. I think we should implement the schema changes and consider kicking the functional implmenetation down the road.

inahga commented 4 days ago

As I see it, the primary operational difference to approach https://github.com/divviup/janus/pull/1 is that we will retransmit the request with each polling attempt, wasting bandwidth. Approach https://github.com/divviup/janus/pull/2 does not have this (but does require more implementation effort). Are there other differences?

Not quite. In both cases there shouldn't be any unnecessary retransmission of request. The key difference is just when we poll the aggregation job.

At the moment, we do roughly

// in step_aggregation_job_aggregate_init()

// Calculated in code previous to this.
let job: AggregationJob;
let leader_side_state; // leader-side prepared report aggregations

let resp = {
    let request = AggregationJobInitializeReq::new(...);
    send_request_to_helper(request, ...);
};
process_response_from_helper(resp, leader_side_state);

Approach 1 prescribes that we simply do the polling inline, roughly:

let backoff = ExponentialBackoff::default();
let resp = {
    let request = AggregationJobInitializeReq::new(...);
    let resp = send_put_request_to_helper(request, ...);

    match resp.status {
        AggregationJobStatus::Ready(prepare_resps) => prepare_resps,
        AggregationJobStatus::Processing(uri_poll_to_at) => {
             loop {
                 let resp = send_get_request_to_helper(uri_to_poll_at);
                 match resp {
                     AggregationJobStatus::Ready(prepare_resps) => break prepare_resps,
                     AggregationJobStatus::processing => backoff.wait(),
                 }
             }
        }
    }
}
process_response_from_helper(resp, leader_side_state);

The drawback being that the job driver is now sleeping without doing productive work between polling attempts (i.e. it's not servicing other waiting aggregation jobs).

Approach 2 prescribes that we do this:

// Calculated in code previous to this,
let job: AggregationJob;
let leader_side_state; // leader-side prepared report aggregations

let resp = {
    let request = AggregationJobInitializeReq::new(job, ...);
    let resp = send_put_request_to_helper(request, ...);

    match resp.status {
        AggregationJobStatus::Ready(prepare_resps) => prepare_resps,
        AggregationJobStatus::Processing(uri_poll_to_at) => {
             job.set_state(AggregationJobState::PollingHelper, leader_side_state);
             job.poll_again_after(some_duration)
             return Ok(());
        }
    }
}
process_response_from_helper(resp, leader_side_state);

Then we have separate logic in the job acquirer that dispatches on this new state:

let aggregation_jobs = get_aggregation_jobs_in_non_terminal_state();
for job in aggregation jobs {
    match aggregation_jobs.state() {
         AggregationJobState::InProgress => { do_whatever_we_do_normally() }
         AggregationJobState::PollingHelper => {
             let leader_state = load_checkpointed_leader_state(job);
             poll_aggregation_job(job, leader_state)
         }
    }
}

where poll_aggregation_job() is

let resp = {
    let resp = send_get_request_to_helper(job.uri_to_poll, ...);
    match resp.status {
        AggregationJobStatus::Ready(prepare_resps) => prepare_resps,
        AggregationJobStatus::Processing(uri_poll_to_at) => {
             job.poll_again_after(some_duration);
             return Ok(());
        }
    }
}

process_response_from_helper(resp, report_states);

The drawback being more complexity and needing to checkpoint the leader-side report aggregation state to the database. But this should make better use of aggregation job driver time.

branlwyd commented 3 days ago

Apologies for the misunderstanding -- I think we should definitely go with Leader approach 2. Taking up an aggregation job driver "slot" doing exponential backoff polling is very likely to negatively impact the throughput of the system. Instead, if the Helper is not ready, we should put the job back onto the queue (likely with exponential backoff over repeated attempts?) and look for different work to pick up.

inahga commented 3 days ago

SGTM.

FWIW this problem more or less already exists, because we exponentially backoff on transient retries to the PUT request. During which time the aggregation job driver is not doing productive work. It may be worth also considering what changes are needed to fix this, since we'll be overhauling this code anyway (but not a priority).

branlwyd commented 3 days ago

I agree. The simplest way to address this might be to rip out that exponential-backoff polling as well, solving it the same way we plan to solve Helper polling: implementing the backoff based on placing the job back into the queue with an exponential backoff on how quickly it can be re-acquired.

(Though this is off-topic from this issue, & optional.)