divviup / janus

Experimental implementation of the Distributed Aggregation Protocol (DAP) specification.
Mozilla Public License 2.0
52 stars 14 forks source link

Adapters for easy collection #2254

Closed inahga closed 1 week ago

inahga commented 10 months ago

Issue transferred from internal discussion, so that we can get community input.

Originally proposed by @branlwyd on Aug 21, 2023

We should provide some adapters to allow for easy collection of data into commonly-desired formats. This adapter would effectively act as a DAP Collector, and emit aggregates in a format desired by the operator of the adapter. Potential output formats include:

* CSV

* Prometheus/Grafana stack

* Other formats, based on customer needs/feedback

Note that a DAP Collector is a stateful service; the adapter will need some storage to track both outstanding collection jobs to be polled, as well as collected aggregates to be emitted.

We may want to consider running these adapters ourselves eventually, to provide a turnkey collection solution for low-complexity users. We should carefully consider the operational & privacy impacts to Divvi Up -- running these adapters ourselves would expose us to aggregates, which may not be acceptable to many subscribers; we would also be on the hook for configuring the adapter & storing data for the adapter, which will simplify the deployment process for subscribers.

inahga commented 10 months ago

@branlwyd wrote on Aug 28, 2023


Some prior Slack discussion:

We decided to prioritize this work for a few reasons:

There's a design element to this work, I'm going to sketch a design in this issue next.

inahga commented 10 months ago

@branlwyd wrote on Aug 28, 2023


Design considerations:

Retrieving aggregates from the Aggregator:

Emitting values:

inahga commented 10 months ago

I'm still fleshing out a design, but a thought occurs to me: we may want to more generalize what we want the outputs to be.

I think we get a lot of value by managing collection jobs and simply appending collection results to a SQL table. The resulting tool should end up more composable. Users of the software can hook into the table and directly perform queries, export to some desired format (i.e. CSV), and programatically interact with the database with e.g. Jupyter.

This effectively makes the CSV idea follow the emitter pattern, like OTEL would. I also suspect there might be a use case for an AMQP emitter, for possible incorporation with an existing data pipelines (although I need to do more research on what a typical pattern for this is).

In short, I suggest that data conversion would follow an interface like so:

#[async_trait]
pub trait Emitter<V: Vdaf::Collector, Q: QueryType> {
    async fn emit(&self, collection: Collection<V::AggregateResult, Q> -> Result<(), Error>;
}

struct SQLEmitter { /* database connection parameters */ }

/// Appends a Prio3Count collection result to a table.
/// We likely need to impl over the various VDAFs since storage of the aggregate results would be different.
/// For prio3, can macro this out to scalar and vector types I think...
/// This is generic over QueryType simply because the Collection type takes it--the query type is not actually
/// pertinent for storage.
impl<Q: QueryType> Emitter<Prio3Count, Q> for SqlEmitter { ... }
branlwyd commented 10 months ago

I think this is a tradeoff in ease-of-use -- e.g. if folks ultimately want a CSV result, rather than a SQL database of some form, this change would sign them up for additional work.

OTOH, generating CSV from a DB is pretty easy. And I really like the simplification here. So please consider this a 👍🏻 from me.

BTW, I suspect that the query type is relevant to the emitter -- it would determine the type of the batch identifier. And IMO we'd probably want to include that in the information we emit. I suppose it's less important that we record fixed-size's BatchIDs since they're opaque, but users will potentially get real value out of time-interval's, er, intervals. But I think we may also need to record BatchIDs for fixed-size tasks to ensure we avoid repeated collection.

inahga commented 10 months ago

This comment describes an emitter, which is the general form of a stateful DAP collector.

An emitter reliably manages many collection jobs to yield aggregate results, then sends the aggregate results to some other standard, durable storage.

Two implementations of emitter are proposed.

Common functionality is factored out into a library crate in Janus, which both implementations rely on.

Design Goals

Minimum functional requirements:

Minimum non-functional requirements:

QUESTION: Any other non-functional requirements to be conscious of?

Deployment

A standard emitter deployment roughly follows this topology:

flowchart LR
    subgraph Subscriber Controlled
        direction LR
        A{"`**Emitter**`"} <-- Operational\nState --> B[("Operational\nDatastore")]
        A -- Aggregate\nResults --> C[("Emitter\nBackend")]
        A -- Aggregate\nResults --> D[("Emitter\nBackend")]
    end
    subgraph Hosted Services
        direction LR
        J[DAP Leader\nAggregator] <-- Collection\nJobs --> A
    end

Emitter backends are robust datastores that may or may not already be part of a subscriber's data pipeline. Initial backend support is for a SQL database. It is possible to use the operational datastore as the emitter backend, to ease deployment complexity.

The emitter is a single process, which can be orchestrated by any standard process management tool, e.g. Kubernetes, systemd, Nomad. All persistent state is stored in the operational database.

It is safe to run multiple emitter processes sharing the same operational datastore, allowing HA and scaling.

Subscribers should not operate multiple emitters for a single DAP task. This would result in non-deterministic splitting of results between the two emitters.

Configuration

Configuration is YAML-based, in similar vein to the configuration files used by Janus.

The config schema is roughly like so:

# config.yaml

# Same as Janus.
database:
  url: postgres://postgres@127.0.0.1:5432/postgres
health_check_listen_address: 0.0.0.0:8003
logging_config:
  # ...
metrics_config:
  # ...

# Emitter-specific configuration
task_discovery_interval_secs: 60
emitters:
  # Name is mainly used to associate secret-containing environment variables.
  - name: sql-1
    sql:
      url: postgres://postgres@127.0.0.1:5432/aggregate_results
  - name: amqp-1
    amqp:
      url: amqps://amqp@127.0.0.1:10000/vhost

For Divvi Up Emitters, config.yaml is extended with account details:

divviup_api_url: https://api.divviup.org
divviup_account_id: abcd1234

For Janus Emitters, a tasks.yaml file is required:

- task_id: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
  leader: "https://example.com/"
  query_type: !FixedSize
    schedule: "0 * * * *" # Every hour at minute 0.
  vdaf: !Prio3Sum
    bits: 16
  time_precision: 1800
  collector_credential: foo

- task_id: "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
  leader: "https://example.com/"
  query_type: !FixedSize
    schedule: "0 1 * * 1" # 01:00 on Monday.
  vdaf: !Prio3Count
  time_precision: 1800
  collector_credential: bar

Notice that fixed-size tasks contain a schedule following a cron expression. This dictates the interval in which fixed-size queries are polled.

tasks.yaml can be modified at runtime and will automatically be refreshed.

Secrets are configurable through environment variables. For example:

EMITTER_DATABASE_PASSWORD=password
EMITTER_SQL_PASSWORD_sql-1=password
EMITTER_AMQP_PASSWORD_amqp-1=password
EMITTER_COLLECTOR_CREDENTIAL_foo='{json body}' 
EMITTER_COLLECTOR_CREDENTIAL_bar='{json body}' 
EMITTER_DIVVIUP_API_TOKEN=token

PROBLEM: If providing secrets through environment variable it becomes cumbersome to introduce new collector credentials.

PROBLEM: The Divvi Up API does not provide any notion of a collection schedule. This might be a necessary addition--it is conceivable that all tasks for a given subscriber won't fit a one-size-fits-all schedule.

Jobs

An emitter is an asynchronous scheduled job processing server. Emitter processes poll for outstanding jobs, then drive them to completion. Jobs are idempotent, retryable, and interruptible, unless otherwise indicated.

Note the default HTTP retry logic of janus_collector is not used. If an upstream server fails with a retryable error, the job is rescheduled instead for some time in the future, following exponential backoff. This avoids keeping database transactions open or tying up a worker for too long.

Job Types

TODO(inahga): Perhaps the job sequence is best presented as a diagram?

DiscoverTasks

Invoke an implementation defined callback function which returns the list of relevant tasks from some source of truth. Reconcile the internal task table against this list. For each newly discovered task, schedule a CreatePrio3Collection job according to the task's schedule.

For a Janus Emitter, the source of truth is a configuration file. For a Divvi Up Emitter, the source of truth is the Divvi Up API.

Unsupported tasks are ignored.

CreatePrio3Collection

CreatePrio3Collection jobs are generated with a random collection job ID. This ID is persisted to the queue before the following request is sent.

Issue a PUT /tasks/{task_id}/collection_jobs/{collection_job_id} to the DAP leader. For fixed size queries, query for the current batch.

If the response is OK, schedule a PollPrio3Collection job for immediate completion. If the response is a non-retryable error (i.e. there are insufficient reports to satisfy the query), schedule a new CreatePrio3Collection job for the future.

PollPrio3Collection

Poll a collection once. If the collection is ready, create an EmitPrio3Collection job for immediate completion for each configured EmitterBackend. For fixed size queries, schedule another CreatePrio3Collection job for immediate completion.

If the collection is not ready or the request fails with a retryable error, schedule another PollPrio3Collection job following exponential backoff.

NOTE: Ideally we would eagerly create collection jobs until all outstanding reports are associated with a batch, then poll each collection job individually. Thus, the responsibility of scheduling the next collection job falls onto CreatePrio3Collection or some other job scheduling job. However, we cannot do this because Janus can associate a collection job with a batch that is undergoing collection. That batch will not become unavailable for a new collection job until it is first polled. Thus we cannot eagerly create collection jobs. This wil change in the next draft of DAP.

EmitPrio3Collection

Emit a collection to the configured [EmitterBackend]. If this fails with a retryable error, reschedule another EmitPrio3Collection job following exponential backoff.

Other Query Types

For time-interval, changes would consist of:

  1. Job scheduling: jobs would be scheduled on some multiple of the task's time precision, offset by some user-configurable delay to account for long-tail reports.
  2. Job state: the query would be on the interval [last_successful_interval, next_interval).
  3. Functionality for backfilling a time-interval task once it has been discovered. It is possible that a task has been receiving reports for a long time before it is discovered or the emitter is adopted.

QUESTION: Anything else I'm missing?

For time-bucketed fixed-size, no changes are necessary. It is left to the user to combine aggregates for overlapping intervals.

Datastores

Operational Datastore

An emitter must have the same operational datastore support as a Janus aggregator helper, to allow sharing of the datastore instance for subscribers using Bring-Your-Own-Helper (BYOH). At the time of this writing, this is PostgreSQL 15.

The following state is stored:

SQLite support is considered a "nice to have". It is not targeted for initial implementation, but the code/interfaces should be written such that SQLite is an almost-drop-in replacement.

The emitter is well suited for use of an ORM. The set of SQL operations is small, the performance characteristics are lighter than Janus, and we desire portability of databases. We will use SeaORM as the ORM, the ORM used in divviup-api.

NOTE: I haven't found any widely accepted job queue library for Rust+PostgreSQL. Indeed, we roll our own for both Janus and divviup-api. We can largely crib the queue implementation used in divviup-api.

Emitter Backend

An EmitterBackend is the destination for aggregate results. It roughly follows this interface:

/// It's necessary for this to be generic over T because we may want to support VDAFs where the
/// aggregate result is not actually the desired output. For instance, Poplar1 prefix tree levels
/// are not very interesting to emit--we may want to perform some function over the complete tree
/// before emitting results.
#[async_trait]
pub trait EmitterBackend<T, Q: QueryType> {
    async fn emit(&self, collection: Collection<T, Q>) -> Result<(), Error>;
}

Each aggregate result will be supplied to the emit() method.

We expect the most versatile implementation will be SqlEmitterBackend, which at minimum supports the same SQL databases that are used for operational state. It roughly follows this form.

struct SqlEmitterBackend { 
    /* connection parameters, table name, etc. */
}

/// Implementation for the aggregate result of Prio3Count. Ideally we would be able to use the
/// associated type `<Prio3Count as Vdaf>::AggregateResult` instead of raw `u64`, but there
/// appears to be a compiler bug when impl'ing with associated types.
/// https://github.com/rust-lang/rust/issues/99940
impl EmitterBackend<u64, FixedSize> for SqlEmitterBackend {
    async fn emit(&self, collection: Collection<u64, FixedSize> -> Result<(), Error> {
        /* append the aggregate result to the desired table */
    }
}

For each received aggregate result, it writes a row to a target table. See SqlEmitterBackend Usage for details.

Each EmitterBackend places soft constraints on the proper usage of the resulting data, and dictates its own API stability guidelines. The EmitterBackend interface should be versatile enough to fit other storage backends, including but not limited to Prometheus (OpenTelemetry), AMQP-compatible services, data warehouses such as BigQuery, AWS S3, and perhaps even file/stdout.

An emitter can emit to multiple backends.

SqlEmitterBackend Usage

TODO(inahga): This schema is not so great. I think it would be more useful to have separate tables for each VDAF which FK into a main aggregate_results table. Need to iterate more.

The SqlEmitterBackend manages an additional schema that is roughly like so:

-- Note: schema is incomplete, some fields are omitted for brevity.

CREATE TABLE internal_prio3_aggregate_results (
    task_id          BYTEA UNIQUE NOT NULL,
    aggregate_result []BIGINT NOT NULL, -- Scalar results (count, sum) are a 1-sized array,
                                        -- vectors (histogram, sumvec) are n-sized arrays.
                                        -- TODO(inahga): bigint isn't big enough--non-count VDAFs
                                        -- are u128.
    interval         TSRANGE NOT NULL,
    created_at       TIMESTAMP NOT NULL,
);

CREATE VIEW prio3_aggregate_results_v1 AS 
    SELECT task_id, aggregate_result, interval, created_at
    FROM internal_aggregate_results;

A separate database should be dedicated to aggregate results, but the database instance may be shared with the operational database.

The schema of internal_prio3_aggregate_results is an implementation detail and may change arbitrarily between releases. Users must not rely on the schema of any internal tables.

The supported and stable API boundary is the prio3_aggregate_results_v1 view. Users may query this view to retrieve results at will.

The returned columns of a versioned view are guaranteed to never change, even if the internal_prio3_aggregate_results table changes. The definition of the view may change arbitrarily between releases. If new columns are necessary, they will be introduced as prio3_aggregate_results_v2, thus preserving the functionality of any existing queries or programs. Users may migrate to prio3_aggregate_results_v2 at will.

An additional SqlGarbageCollection job may be configured and run periodically to prune old results.

NOTE: We could make prio3_aggregate_results_v1 a materialized view. This has better performance characteristics in exchange needing management of REFRESH MATERIALIZED VIEW. It also lets users create arbitrary indices against the view. Materialized views are supported in PostgreSQL but not SQLite.

NOTE: The background of this design choice is that we will run into similar design problems with having CSV export. We would require some persistent SQL storage of aggregate results, and expose some API boundary to query them. The API boundary may be subject to feature requests, e.g. filter by date, filter by task ID, join against task metadata--which are best suited as SQL operations. We would also have similar problems around API boundary stability (i.e. needing a versioned CSV export API). We can cut out the middle man and allow maximum user flexibility by directly exposing a SQL interface, with some boundaries around what views are safe to use. If we desire the simplicity of a CSV export endpoint, we could expose a trivial /dump endpoint which is a CSV export of the table as-is.

Handling Different DAP Versions

The Janus emitter follows the same DAP versioning strategy currently in Janus. That is, a single instance of the emitter will not support multiple DAP versions.

QUESTION: Should the Divvi Up Emitter support multiple DAP versions? This could get really nasty and complex...

Aggregation Parameter

The Emitter will always assume the aggregation parameter is (), which is true for Prio3.

See Prefix tree VDAFs for more discussion.

Prefix tree VDAFs

NOTE: Most of this section is not relevant for the minimal implementation of an emitter, but I think at least some discussion is justified to avoid painting ourselves into a corner when it comes time for implementing Poplar1.

Collection for Poplar1 (and Mastic) first involves collection of all levels of a prefix tree, then evaluating some function based on the complete prefix tree. The known useful functions are heavy hitters and quantiles.

Given a VDAF construction of N bits, N collections must be made to fully construct the prefix tree for any given batch. Collection jobs for a given batch must be executed sequentially, with the aggregation parameter being the set of candidate prefixes of interest, determined by the last aggregate result.

I think further design on this is gated by the following questions:

QUESTION: What is the desired result of a Poplar1 collection? Options:

  1. Just emit each level of the prefix tree (i.e. pretty much do nothing and leave it to the user to construct the prefix tree).
  2. Emit the complete prefix tree, without evaluating any function. Leave it to the user to execute functions on the tree.
  3. Evaluate some function on the prefix tree, such as heavy hitters or quantiles, and emit the results.

QUESTION: What other functions of the prefix tree could be useful, besides heavy hitters and quantiles? Could we adopt both options 2 and 3, where we expose the convenient functions but also optionally leave it to the user to create their own functions over the prefix tree?

branlwyd commented 10 months ago

A few thoughts:

inahga commented 10 months ago

Is the idea that the Divvi Up emitter configuration would allow the emitter to start collecting for all tasks for a given account? I think that may not be the right default, but we should certainly support it. Maybe either a whitelist of task IDs or a boolean flag saying "collect all tasks for account"?

Good point. I think there can be some kind of configuration like

# Maybe glob patterns? (by ID or name--name would be friendlier and more amenable to patterns)
divviup_task_selector:
  - *

divviup_task_selector:
  - aaaaaaaaaaaaaaaaa
  - bbbbbbbbbbbbbbbb

I'm not sure a polling strategy based on a cron-like schedule works, at least not for a fixed-size.

Good point, I think exponential backoff should work better here, with some levers in the config to adjust the backoff parameters.

The only thing missing from time-interval support would be the ability to "extend" collection intervals if the initial interval turns out to not have enough reports.

:+1: I meant for that to be captured in the discussion about job state changes, but I realize my wording doesn't make that clear.

Why not reuse the same job?

We can and should. I'll adjust the wording.


small but important nitpick: "That batch will not become unavailable for a new collection job until it is first polled." This is true in Janus, but in general, all the spec says about current-batch requests is "The Leader SHOULD select a batch which has not yet began collection." An aggregator is free to assign batches pretty much arbitrarily, and can ignore the spec's advice while remaining DAP-compliant since it's only a SHOULD -- a Collector implementation therefore must dedupe based on batch ID. My suggestion would be to just design/implement the Collector as if https://github.com/ietf-wg-ppm/draft-ietf-ppm-dap/issues/526 is landed, i.e. ignore the need to dedupe.

The problem is less about deduplication of reports, moreso how aggressive we can get with creating collection jobs.

The ideal collection flow for fixed size would be to create as many jobs as possible, until job creation starts to fail because all outstanding batches are exhausted. That is, if there are 3 batches outstanding, I should be able to directly issue 3 PUT /tasks/{task_id}/collection_jobs/{collection_job_id}, without worrying about polling, and have the 4th one fail, signaling me to backoff on creating new collection jobs.

In Janus, this doesn't work because we're always associating the collection job ID with the same batch until the batch has been polled. That is, if I adopt the job creation strategy above, I'll be in an infinite loop creating new collection jobs.

For Janus this is an easy enough fix (hopefully) and we should probably take it as a bugfix.

However there may be an argument that the aforementioned ideal collector is not DAP-compliant. Could DAP saying "The Leader SHOULD select a batch which has not yet began collection" imply that the collector must not assume that the aggregator will stop allowing new creation jobs when all outstanding batches are exhausted? The SHOULD -> MUST change removes this ambiguity.

I think reading to deep into the letter of the law here makes the design less effective and you're right we should just operate as if the SHOULD -> MUST change has landed. We can just have it as an admonition somewhere of "if you're using a non-Janus aggregator, be warned...".

I will change the job flow to aggressively create collection jobs, and prioritize a fix for Janus' behavior.

inahga commented 5 months ago

I've not been actively working on this due to competing priorities.

inahga commented 1 week ago

This is not planned. While it's still necessary, we haven't gotten much interest in using it, and our current use cases use bespoke collector implementations.