DataJunction / dj

A metrics platform.
http://datajunction.io
MIT License
36 stars 15 forks source link

Support defining additional aggregations on top of metric nodes #272

Open shangyian opened 1 year ago

shangyian commented 1 year ago

For some of our metrics, we want to perform statistical inference downstream.

The available inference models for each metric are stored externally to DJ, with context communicated via API calls. In order to streamline the computation there, we need to provide another layer of aggregations on top of the raw metric data. This set of aggregations will typically compute sufficient statistics (i.e., sum, sum of squares etc), but the exact aggregations will vary depending on the inference model.

We also need to support two different types of source data:

Endpoint Changes:

samredai commented 1 year ago

Thanks for highlighting these use cases! It sounds like at a high level there are two things here so I'll reply to each one separately.

First thing: Adding pre-aggregated source nodes and defining metrics on top of it: For this, I wonder if we can infer that a metric is pre-aggregated by looking at the query in the metric node definition. Currently a metric node is defined by a highly restrictive query that must select a single column from a single node, i.e. SELECT SUM(foo) FROM some_node. Since we're parsing the SQL during the build step, we can detect if a metric node's query is defined without any aggregation and I think it makes sense to make it a hard assumption that no aggregation function means the metric has been pre-aggregated. So for example a query SELECT foo FROM some_node would make DJ assume that foo must have been pre-aggregated (otherwise it's not a metric, right?). I think this gets us a mechanism for users to communicate that a metric is pre-aggregated without adding any additional attributes like "pre-aggregated": "true", for example.

One thing I would add though is I think (not sure) that we should force pre-aggregated metrics to come with all of the dimensions that were used to segment the aggregation. If we let them request the pre-aggregated metric with just half of the dimensions from the pre-aggregated table, there's no guarantee that it's correct for DJ to re-aggregate with the subset of the dimensions and still have valid numbers, not to mention we would need the user to also store the function that's supposed to be used in that re-aggregation. I know there are some ways we can determine when this can be allowed by having the user give us more information but I feel like it could get tricky and if they're bringing in a pre-aggregated table, they probably already have it grouped by the dimensions they're interested in. What we can still support though is if they tag one of the columns in the pre-aggregated table as a join key to an existing dimension node in DJ, we can automatically bring in all of those extra attributes that are in the dimension node (this would be different from a typical dimension tag on a column since we want the build to do just a left join, but no group by).

If I'm not missing anything, then I think this option would:

Second thing: Requesting metrics and dimensions but "raw" meaning don't do the aggregation: I hadn't thought of this but I see now how it's a requirement for the statistical inference analyses that needs to choose a set of metrics and dimensions and then receive the data before it's aggregated. I agree the solution is to include an optional flag for the GET /metrics/{node_id}/sql route but I would suggest something shorter than include_extra_aggregations--maybe uncompressed that defaults to False or aggregate that defaults to True. It also makes sense to me that it would raise an error if a request comes in that asks for the unaggregated form of a pre-aggregated metric.

shangyian commented 1 year ago

For pre-agg data, several points:

  1. I'd rather that we be explicit about each assumption we make around the data than just assuming that if the SQL doesn't have aggregations, it must be pre-aggregated. It feels too similar to something like enforcing column naming suffixes to represent table metadata.
  2. We should aim to get all of the aggregations, whether pre-aggregated or computed from the raw data in one go. Therefore there would not be a single pre-aggregated metric column, but rather many.
  3. Agreed that we would still want to create the AdhocDimension or FrozenDimension node type for each of the dimensions that come from the pre-aggregated source table case.
  4. We don't need to pass in pre-aggregated: true. Instead, an approach like this could work (I think this is essentially what @agorajek mentioned on Slack):
POST /metrics
{
    "name": "days_paid",
    "query": "SELECT * FROM table",
    "extra_aggregations": {
        "sum": "days_paid|sum",
        "sum_of_squares": "days_paid|sos",
        "count": "days_paid|cnt"
    }
}

In the case of metrics with non-pre-aggregated data, we can instead pass in:

POST /metrics
{
    "name": "days_paid",
    "query": "SELECT days_paid FROM table",
    "extra_aggregations": {
        "sum": "SUM(days_paid)",
        "sum_of_squares": "SUM(days_paid * days_paid)",
        "count": "COUNT(days_paid)"
    }
}
agorajek commented 1 year ago

@shangyian I mostly agree with your previous comment, just need some clarification:

  1. What do you mean by the pipe syntax in say "days_paid|sum" ?
  2. I think your first example should have the column name in the SELECT clause, rather than a star, right?
  3. We both though that extra_aggregations can be called stats after all, right?
  4. I think the requirement of the expressions in the stats section should be that they always reference the main column from the metric, otherwise people will abuse it, right?
shangyian commented 1 year ago

@agorajek

  1. The pipe syntax is actually how it's represented in some of our tables now, but that's just a minor detail. (Well, except for the fact that we should move away from |, as special chars in column names can be painful, but that's a separate topic 😄 )
  2. The problem in this case is that the column doesn't have a name... as in, days_paid doesn't exist as a column on the pre-aggregated table.
  3. Yep +1 to stats
  4. Yeah, that seems reasonable. We could even be more restrictive if we wanted, as in, have a list of aggregation functions we support and disallow others.
agorajek commented 1 year ago

@shangyian

  1. yeah, ok
  2. hmmm, this is interesting... did I just accidentally spotted a pattern that a PreAgg Metric will not have a "main" metric field at all?
shangyian commented 1 year ago

@agorajek Yep, there indeed won't be a main metric field in that case. For either retrieving SQL or data for this metric node, it would only return values when the request also specifies that it wants one or more items in stats. If someone tries to get metric data directly, it'll error out.

samredai commented 1 year ago

I had a chat earlier with @matthewwardrop and he helped me think through much of this. I want to make a proposal for adding two arguments that can be provided at request time to the /metrics/{name}/data endpoint, analysis_unit: List[str] and aggs: List[str]. The first contains one or more dimensions that determine an analysis unit of the metric that's relevant for compression, and the second contains a list of aggs to use in the compression (we'll start by allowing one or more of sum, count, and sos (sum of squares)).

@router.get("/metrics/{name}/data/", response_model=QueryWithResults)
async def read_metrics_data(
    name: str,
    database_name: Optional[str] = None,
    d: List[str] = Query([]),  # pylint: disable=invalid-name
    f: List[str] = Query([]),  # pylint: disable=invalid-name
    analysis_unit: List[str] = [],
    aggs: List[str] = [],
    *,
    session: Session = Depends(get_session),
    settings: Settings = Depends(get_settings),
    response: Response,
    background_tasks: BackgroundTasks,
) -> QueryWithResults:

They would both be optional and for anyone who just wants to request a metric grouped by some number of dimensions, their API calls would not change at all. To highlight how these optional arguments would be used, I've listed out five scenarios.

Scenario 1

{"metric": "foo", "dimensions": ["bar", "baz", "qux"], "analysis_unit": [], "aggs": []}

Behavior: Standard DJ behavior that exists today. The metric is aggregated by the aggregation function defined in it and grouped by the dimensions specified in the dimensions field.

Scenario 2

{"metric": "foo", "dimensions": ["bar", "baz", "qux"], "analysis_unit": ["qux"], "aggs": ["sum", "count", "sos"]}

Behavior: After the initial aggregation to calculate metric foo across the dimensions bar, baz and qux, a second aggregation is performed. This second aggregation compresses the data by doing the aggregation over all of the dimensions except the analysis unit which is qux (example below).

SELECT
bar,
baz
SUM(foo) as foo_sum,
COUNT(foo) as foo_count,
SUM(SQUARE(foo)) as foo_sos,
FROM (...)
GROUP BY bar, baz

Scenario 3:

{"metric": "foo", "dimensions": ["bar", "baz"], "analysis_unit": ["qux"], "aggs": ["sum", "count", "sos"]}

Behavior: Very similar to scenario 2, except one small detail. The user didn't include the analysis unit in their dimensions. Since doing compression requires that the analysis unit was included in the first aggregation, we automatically include it and so produce the same output as Scenario 2.

Scenario 4:

{"metric": "foo", "dimensions": ["bar", "baz"], "analysis_unit": ["qux"], "aggs": []}

Behavior: The user didn't provide any additional aggs so no second aggregation is performed. In this scenario, it's treated as just another dimension and the result produced is exactly the same as Scenario 1. Note that the user could have left analysis unit empty and just provided qux as one of the dimensions in the list with the other two, but they decided to explicitly identify it as the analysis unit (not just a dimension used in segmentation). That little detail doesn't change the behavior in this scenario.

Scenario 5:

{"metric": "foo", "dimensions": ["bar", "baz", "qux"], "analysis_unit": [], "aggs": ["sum", "count", "sos"]}

Behavior: This raises an error because we can't do compression (a second aggregation) if we don't know the analysis unit. The error message would be something along the lines of "Cannot calculate compression: analysis_unit was not provided".

Let me know if this makes sense to everyone and if there are any objections!

shangyian commented 1 year ago

@samredai Nice, thanks for the list!

I think we should rename aggs to stats, as that makes it clear that we won't be enabling generic aggregations for users. We can limit the list of supported ones to just what's needed:

class Stats(str, enum.Enum):
    SUM = "sum"
    SOS = "sos"
    COUNT = "count"

--

Keep in mind that we still need to support the pre-aggregated case. I would also add these scenarios:

Scenario 6

The underlying metric data only contains these sufficient stats and not the raw data. When the user creates the metric, they should be able to indicate to us that this is the case. This means that in the metric creation stage, we need:

An example:

POST /metrics
{
    "name": "days_paid",
    "query": "SELECT days_paid_sum, days_paid_sos, days_paid_cnt FROM table",
    "included_stats": {
        "sum": "days_paid_sum",
        "sum_of_squares": "days_paid_sos",
        "count": "days_paid_cnt"
    }
}

Then we might query for:

POST /metrics/days_paid/sql
{"dimensions": ["bar", "baz", "qux"], "analysis_unit": ["qux"], "stats": ["sum", "count", "sos"]}

Which would then return:

SELECT
  bar,
  baz
  days_paid_sum AS days_paid_sum,
  days_paid_cnt AS days_paid_count,
  days_paid_sos AS days_paid_sos,
FROM table
GROUP BY bar, baz

Scenario 7

The same setup as scenario 6, but if we asked for the data without stats:

{"dimensions": ["bar", "baz", "qux"], "analysis_unit": ["qux"], "stats": []}

It would error out, as it's not possible to produce the raw metric data without the second aggregation. A similar error would occur if we try to ask for aggregations that weren't in the list of ones already computed.

samredai commented 1 year ago

@shangyian ah right, the case of a pre-aggregated source node would likely have its own special path of handling logic but we should definitely keep it a consistent experience to someone calling the API, the way you have it in your examples. One question about this one:

POST /metrics
{
    "name": "days_paid",
    "query": "SELECT days_paid_sum, days_paid_sos, days_paid_cnt FROM table",
    "included_stats": {
        "sum": "days_paid_sum",
        "sum_of_squares": "days_paid_sos",
        "count": "days_paid_cnt"
    }
}

Then we might query for:

POST /metrics/days_paid/sql
{"dimensions": ["bar", "baz", "qux"], "analysis_unit": ["qux"], "stats": ["sum", "count", "sos"]}

These included_stats would be defined on the table source node directly, right? Since we're going to add a way to differentiate a normal source node from a pre-aggregated source node, I'm imagining we could allow directly tagging those days_paid_sum, days_paid_sos, and days_paid_cnt columns on the source node. The dimensions/segmentation as well as the unit of analysis is also fixed in the pre-aggregation, so maybe all of that information should be part of the requirement to bring in a pre-aggregated+pre-compressed table into DJ.

pseudocode

table:
  columns:
    - name: qux
      type: FLOAT
      pre_agg_type: ANALYSIS_UNIT
    - name: bar
      type: STR
      pre_agg_type: DIMENSION
    - name: baz
      type: STR
      pre_agg_type: DIMENSION
    - name: days_paid_sum
      type: INT
      pre_agg_type: STATS
      metric: days_paid
      function: SUM
    - name: days_paid_sos
      type: INT
      pre_agg_type: STATS
      metric: days_paid
      function: SOS
    - name: days_paid_cnt
      type: INT
      pre_agg_type: STATS
      metric: days_paid
      function: COUNT

If the pre-aggregated table is brought in with the above metadata, then we have all the information we need to surface a metric days_paid and make it available via the API request you suggested for scenario 6 and we also know to fail for scenario 7. I also like that it puts the responsibility of defining all of this properly onto the person who's bringing in the pre-aggregated table which I imagine is always the person who will know the best what to set. We can also get away with the user not having to do the extra step of defining a placeholder metric node. Instead, we could just always include these metrics defined right on pre-aggregated source nodes in all places where we list "available metrics".

matthewwardrop commented 1 year ago

@samredai Thanks so much for summarising things here. There were a few deviations from my understanding of our chat in the scenarios you posted, which I've noted below. If you agree with them, feel free to edit your post above (and note you've done so that people aren't confused). Otherwise, we can chat about it a bit more :).

My edits are emboldened.

Scenario 2 (edited)

{"metric": "foo", "dimensions": ["bar", "baz", "qux"], "analysis_unit": ["qux"], "aggs": ["sum", "count", "sos"]}

Behavior: After the initial aggregation to calculate metric foo across the dimensions bar, baz and qux, a second aggregation is performed. This second aggregation compresses the data by doing the aggregation over all of the dimensions except the analysis unit which is qux (example below). For each group in the Cartesian product of the dimensions we compute the sum, count and sos for the rows. In this case these aggregations are taken over a single row, since the analysis unit is present already in the dimensions.

SELECT
bar,
baz
SUM(foo) as foo_sum,
COUNT(foo) as foo_count,
SUM(SQUARE(foo)) as foo_sos,
FROM (...) 
GROUP BY bar, baz, qux

**where the ellipsis refers to the metric table where the standard metric aggregation has been computed on the data grouped by the nominated dimensions.

Note: This is a bit of a confusing scenario because the analysis_unit doesn't change the behaviour of the first aggregation, nor the dimensionality of the second.

Scenario 3:

{"metric": "foo", "dimensions": ["bar", "baz"], "analysis_unit": ["qux"], "aggs": ["sum", "count", "sos"]}

Behavior: Very similar to scenario 2, except one small detail. The user didn't include the analysis unit in their dimensions. Since doing compression requires that the analysis unit was included in the first aggregation, we automatically include it and so produce the same output as Scenario 2. This omission means that the first aggregation is grouped by the union of the dimensions and analysis_unit fields, but the second is only grouped by the dimensions. This allows the aggs to describe the distribution of the metric over the unit of analysis.

Scenario 4 was fine.

Scenario 5: You could compute the result by interpreting an empty analysis unit as referring to the groups formed by the first aggregation.

@shangyian Agreed that the pre-computed case needs more metadata attached to a "node" at some point. That metadata should just be the same information that we would have passed to the endpoint if the data was not compressed (since the pre-compressed data is basically a cached version of the compressed data. We'd just need to compare this metadata to the request to know whether the query can be served.

As an aside, we also need to identify whether a dimension is acting like a partition in the data, over which you cannot re-aggregate the compressed data. We can chat about that in more detail later/soon.

agorajek commented 1 year ago

@matthewwardrop I had a quick sync on this subject with @samredai and @shangyian and here is a scenario that we could support.

Some DJ client (e.g. an inference service) sends the following request to an appropriate DJ endpoint (say /query or /create-node) with the following content:

{
  "metrics": ["m1", "m2"], 
  "dimensions": ["dim1", "dim2", "dim3"], 
  "analysis_unit": ["dim4"], 
  "aggs": ["sum", "count", "sos"]
}

And let's assume that dim1 and dim2 are dimensions tied to the unit of analysis, and that dim3 is a dimension tied to the event measured by metrics: m1 and m2. What's special about the above query is that it would tell DJ that dim4 is my unit of analysis so make sure to "watch out" for it.

What would happen then is DJ would construct a query with 2 layers:

After that DJ could respond with the following payload:

{
  "data": {
    # pointer to a table or data of the following shape (PK: "dim1", "dim2", "dim3"):
    "columns": ["dim1", "dim2", "dim3", "m1_sum", "m1_count", "m1_sos", "m2_sum", "m2_count", "m2_sos"],
    "rows": ...
  },
  "metadata": {
    "unrestricted_dimensions": ["dim1", "dim2"],
    "filter_only_dimensions": ["dim3"],
  }
}

Notice that the metadata section should give you the information on what you can/cannot do with the data you receive or run queries on. Whether the DJ client will use that information it is up to them.

If the above makes sense, we can discuss using a pre-rendered tables (later), but I think we should first agree on the DJ-generated solution for the experimentation use case.

matthewwardrop commented 1 year ago

Hi @agorajek ,

Thanks for looping me in. A few quick comments:

Firstly:

inner query would compute metrics m1 and m2 on the grain of data, where dim4 is included,

This inner query should be over the unit of analysis, rather than the grain of the underlying data. Is that what you meant? I think it is given the rest of the post, but just wanted to make sure. The distinction is important because metric aggregations can be arbitrary complex, and need to be evaluated at the unit of analysis for the next aggregation stage to make sense.

Secondly:

Regarding the payload, I like it. I find it really helpful to think of the "filter_only_dimensions" as "partitions", since in many ways it acts exactly like a partition of a dim table. Perhaps the metadata could then be simplified to: {"dimensions": [...], "partitions": [...]}.

Thirdly:

I'm not 100% sold on the naming scheme currently suggested for the final aggregations (e.g. "measure_sum"). It runs the risk of overlapping with the naming space of uncompressed features. At Netflix (and mensor where the naming scheme was established), we use a special character to separate the statistic name from the measure/metric name, which allows for the data to be more easily parsed with ambiguities. How do people feel about continuing this patten, and naming things like "measure|sum", "measure|sos", "measure|count", etc? Some database backends don't support special characters, but given that we are returning this proxied dataset, I doubt that matters (the columns could do all of aliasing in this return payload).

Alternatively, the payload could be further enriched to look something like:

{
  "data": {
    # pointer to a table or data of the following shape (PK: "dim1", "dim2", "dim3"):
    "columns": ["dim1", "dim2", "dim3", "m1_sum", "m1_count", "m1_sos", "m2_sum", "m2_count", "m2_sos"],
    "rows": ...
  },
  "metadata": {
    "metrics": {
        "m1": {
              "count": "m1_count",
              "sum": "m1_sum",
              ...,
              # If we include this metadata in uncompressed data also, we could report statistic as "raw", as we do at Netflix
         },
    },
    "dimensions": ["dim1", "dim2"],
    "partitions": ["dim3"],
  }
}
shangyian commented 1 year ago

@matthewwardrop I actually prefer the name filter_only_dimensions as it avoids overloading the term partitions, which already means something in the context of data storage that doesn't map onto its meaning here. Whereas filter_only_dimensions is very clear about its intended purpose.

re: naming scheme -- I'd prefer to avoid the use of |, which ultimately causes similar issues as the underscore naming system does around collisions; it just makes the problem rarer. Although we're returning a proxied dataset, it'll still likely be a pointer to a table elsewhere, and I'd like to avoid potential future issues with using special characters like | in column names.

@agorajek also proposed a system that uses maps, which I think is nice and intuitive:

{
  "data": {
    # pointer to table or direct data
    "columns": ["dim1", "dim2", "dim3", "m1_stats", "m2_stats"],
    "rows": [
        ["a", "b", "c", {"sum": 100, "count": 2, "sos": 5000}, {"sum": 100, "count": 2, "sos": 5000}],
        ["x", "y", "z", {"sum": 2, "count": 2, "sos": 2}, {"sum": 2, "count": 2, "sos": 2}],
        ...
    ]
  },
  "metadata": {
    "dimensions": ["dim1", "dim2"],
    "filter_only_dimensions": ["dim3"],
    "stats": ["sum", "count", "sos"]
  }
}

In the case of raw data, it would omit the stats list in metadata and just return m1, m2 without the maps:

{
  "data": {
    # pointer to table or direct data
    "columns": ["dim1", "dim2", "dim3", "m1", "m2"],
    "rows": [
        ["a", "b", "c", 10, 20,
        ["x", "y", "z", 30, 40],
        ...
    ]
  },
  "metadata": {
    "dimensions": ["dim1", "dim2"],
    "filter_only_dimensions": ["dim3"]
  }
}

For the downstream parsing of this dataset with pandas, I don't think the map structure will cause too much of an issue, as it looks like calling pandas.json_normalize(data, sep='|') can handle the transformation nicely into columns with | (i.e., m1|sos).

matthewwardrop commented 1 year ago

@shangyian

I actually prefer the name filter_only_dimensions as it avoids overloading the term partitions, which already means something in the context of data storage that doesn't map onto its meaning here. Whereas filter_only_dimensions is very clear about its intended purpose.

Yeah... I do/would have similar concerns about conflation with db notions of partitions; and I'd be content with the proposal in this respect, so any residual concerns I have shouldn't be agonised over too much. filter_only_dimensions seems to lack something, IMHO... but maybe it would grow on me in time. Given that this distinction appears due to the introduction of an analysis unit, I wonder if there's a natural link to the naming in the request; something like: {"dimensions": ["dim1", "dim2"], "unit_of_analysis_partitions": ["dim3"]}. Or perhaps some more direct reference to the property of the dimensions that needs to be maintained, e.g. nonaggregatable_dimensions or frozen_dimensions.

re: naming scheme -- I'd prefer to avoid the use of |, which ultimately causes similar issues as the underscore naming system does around collisions; it just makes the problem rarer. Although we're returning a proxied dataset, it'll still likely be a pointer to a table elsewhere, and I'd like to avoid potential future issues with using special characters like | in column names.

I agree that being able to avoid invalid names in the underlying data tables is preferable, especially since many databases don't support these characters and you'd have to do translations (as indeed we do at Netflix). And while the naming collisions can be completely avoided by preventing metrics from using the "|" character in their names [which I'd suggest is probably a sensible thing to do anyway (metric keys should perhaps be limited to a specific subset of characters, perhaps valid python identifiers or some such)], I agree that having some nested structure that explicitly indicates the kept statistics rather than basing it on column name would be preferred at this level in the stack.

However, I do have some concerns about non-columnar data representations. In some cases, the data resulting from these queries will be quite large, and I hope at that stage we are not using this row-based JSON intermediate representation. Ideally this would point to some s3 artifact or parquet/iceberg/whatever table, which store their data in memory-contiguous column based layouts (which allows for much more performant analyses); and furthermore this data should hopefully not have its values nested in maps (which increases the overhead of digging these out). To make things more consistent, it probably makes sense to have a JSON data representation that mimics this, something like:

{
    "data": {
        "dim1": ["a", "b", "c"],
        ...
        "m1_sum": [1,2,3],
        "m1_count": [1,2,3],
        ...
    },
  },
  "metadata": {
    "dimensions": ["dim1", "dim2"],
    "filter_only_dimensions": ["dim3"],
    "metrics": {
        "m1": {"count": "m1_count", "sum": "m1_sum", ...},
        ...
     },
  }

...
}

@agorajek also proposed a system that uses maps, which I think is nice and intuitive:

As indicated above, I'm not a huge fan of this. Especially if this ends up being propagated to parquet files that we then have to post-process locally to extract out the relevant data.


Going through the above made me realise there's another consideration here. Hitherto I was dealing mostly with the framing of a single metric (since that was what was written into the above proposal)... but if you are going to be able to get multiple metrics with the same query, each metric could have its own set of statistics (depending on how they are going to be analysed). It could be very wasteful to take the superset of all of the required statistics across all metrics. Perhaps the original query should be of form:

{
    "metrics": ["foo", ...],
    "dimensions": ["bar", "baz", "qux"],
    "analysis_unit": ["dim1", "dim2"],
    "aggs": {  # if you want to specify metrics manually
        "foo": ["sum", "count", "sos"],
        ...
    },
    "aggs": ["sum", "count", "sos"], # if you want the same stats for every metric
}

which would then match quite nicely to the metadata proposal above. Perhaps the dimensions and analysis units would also need to be similarly multiplexed for fully generality (which would be required for the Netflix use-cases; either client-side or server-side).

agorajek commented 1 year ago
  1. @matthewwardrop yes the inner query would be effectively done at the unit if analysis dimension. But let me try an example to make sure we are on the same page. Given:

      {
        "metrics": ["m1", "m2"], 
        "dimensions": ["dim1", "dim2", "dim3"], 
        "analysis_unit": ["dim4"], 
        "aggs": ...
      }

    a) I assume that all the requested "regular" dimensions (excluding a time dimension) will be part of the unit of analysis (UOA) set of attributes, in other words they don't change for any single UOA ID. Hence whether we include them into the inner query GROUP BY (or not) should give us the same number of output rows, b) the filter_only_dimensions are different, we need to include them in the inner query, otherwise we would loose some information,

    So, I think both following variants of the inner query should be equivalent wrt row counts and columns values:

    • SELECT dim1, dim2, dim3, dim4, AGGS(m1), AGGS(m2) FROM ... GROUP BY dim1, dim2, dim3, dim4
    • SELECT dim3, dim4, AGGS(m1), AGGS(m2) FROM ... GROUP BY dim3, dim4

      But the first approach joins the UOA (unit of analysis) dimensions early while the 2nd will require to join them in the next stage. TBH I am not sure which approach will perform better, so I wish to leave that decision for later.

  2. I made up the filter_only_dimensions name w/o much thinking but I agree with @shangyian that partitions is a too overloaded term to use. I briefly considered a name relating to the MECE concept e.g. non_mece_dimensions, but I think simple english will be better here.

  3. Thanks for pointing out the potential necessity of delivering different set of stats for different metrics. Your last example of two forms of aggs param makes sense. But I must push back on making up column names in delivering these stats-as-columns. I think map type columns will be best (considering the flexibility + ease of use together), but I would like to better understand your concern of parsing parquet files to make sure I am not missing something.

matthewwardrop commented 1 year ago

@agorajek Thanks for your reply :).

  1. Yes, looks good to me. I'm rooting for early join for performance, but agreed it is not guaranteed to be the case.
  2. nods. Naming is important, so worth thinking about a bit more before committing to API contracts, but I've said all I need to say here.
  3. Nesting values in maps makes lookups slow, uses more memory at rest, and then ~doubles the memory during inference (where we need to assemble these vectors back into single columns with contiguous memory for performance reasons during compute). I did a quick demo (see below) of the different in performance using pandas, which demonstrates orders of magnitude difference in latency (I didn't benchmark the memory usage separately). For large datasets, with large numbers of metrics and dimensions, this problem will compound. I'm not a huge fan of unnecessarily introducing this computational complexity.

Benchmarking performance of two approaches:

import numpy
import pandas
import time
import matplotlib.pyplot as plt

def benchmark(N):
    print(N)
    df = (
        pandas.DataFrame({"count": numpy.ones(N), "sum": numpy.ones(N), "sos": numpy.ones(N)})
        .assign(combined=lambda df: df.apply(lambda row: dict(row), axis=1))
    )
    start = time.time()
    var = df.sos.sum() / df['count'].sum() - (df['sum'].sum()/df['count'].sum())**2
    uncombined = time.time() - start

    start = time.time()
    sos = df.combined.apply(lambda x: x['sos'])
    count = df.combined.apply(lambda x: x['count'])
    sum = df.combined.apply(lambda x: x['sum'])
    var = sos.sum()/count.sum() - (sum.sum()/count.sum())
    combined = time.time() - start

    return uncombined, combined

results = [
    benchmark(int(10**i))
    for i in numpy.arange(1, 7+1)
]

plt.semilogy(numpy.arange(1, 7+1), results)
plt.grid()
plt.xlabel("Order of magnitude of data")
plt.ylabel("Time (s)")

image

CircArgs commented 1 year ago

Not sure how relevant for this issue this is, but the "DJ SQL" queries on the api support such things I believe https://github.com/DataJunction/dj/blob/dc24a0447df257e821866636613b89753fc6e17c/tests/api/query_test.py#L56-L65