tnightengale / dbt-activity-schema

A dbt-Core package for generating models from an activity stream.
GNU General Public License v3.0
38 stars 5 forks source link

Implement Aggregate Relationships #9

Closed tnightengale closed 1 year ago

tnightengale commented 1 year ago

Create the following relationship macros, following the example of the existing relationship macros:

Checklist:

bcodell commented 1 year ago

I was thinking about this issue more, and after looking at the current implementation of the dataset macro, I have a hunch that there will be a bug in the macro's aggregation query logic for datasets that append multiple activities with different cardinality and use aggregate relationships like sum or count. For example, if a dataset had first ever signed up as its primary activity, and count all before viewed page and count all before started session appended activity-relationship pairs, and an entity had 4 viewed page activity occurrences and 2 started session occurrences, since the current query logic joins all appended activities together in a single CTE, I believe that logic will cause the join to explode and lead to the aggregations to count 8 and 8 of each appended activity, rather than 4 and 2.

I ran into a similar issue in my project, and ultimately, this was the query logic that I ultimately implemented:

The last step is probably the most costly since activity streams aren't partitioned/clustered on activity_id. There may be a more efficient way to query the data than what I outlined, but I couldn't think of another approach and the Activity Schema spec docs don't include any example queries with multiple appended activities.

bcodell commented 1 year ago

I also added macros for aggregate_after and aggregate_all_ever to the issue reqs

tnightengale commented 1 year ago

Hmmm, let's open a branch and write a failing integration test to check. My thinking was that it would explode, but still condense to the correct count (eg, 4 and 2 in your provided case) because we are not counting the primary activity_id, but the appended, which should be uncountable NULLS for all exploded records, but the appended activity.

bcodell commented 1 year ago

Sounds good. Do you still want to take this or can I take a stab at it?

tnightengale commented 1 year ago

@bcodell I'll take a stab at it and circle back if I end up in a dark forest without the right abstractions 😄

bcodell commented 1 year ago

Are you planning to include functionality to parse feature_json as part of this issue? Or for the time being are you expecting users to pass something like ["feature_json['feature_col']::datatype"] for the columns argument for the activity macro?

tnightengale commented 1 year ago

@bcodell I plan to enable basic aggregations (counts) of activities, as I feel that aligns with the expectations set forth in the V2 Spec.

However, if it is possible to cleanly pass a custom column expression, as you suggest, then I think that would be a good choice.

I was thinking of using the same f-string placeholder pattern leveraged by the additional_join_condition arg in the activity.sql "class".

I sort of prefer the flexibility of allowing users to pass their own expression, because they may want to wrap the json_path_extract() with other coalesce() or case ... end functions/expressions.

bcodell commented 1 year ago

@tnightengale that's fair. I'm a little surprised there's no support for aggregations like sum for numeric columns in feature_json, but I'll save those thoughts for a different issue 😅

FWIW, there are explicit references in the spec for summing revenue_impact, and while it's not in the spec, I know Narrator does support list_agg and count(distinct) aggregations for individual features in feature_json (evidence here). Some future functionality worth considering (if you aren't already).

tnightengale commented 1 year ago

One straight forward solution could be to allow folks to unnest their columns first in an earlier CTE, and then use the dataset() macro, passing their CTE to the activity_schema_ref arg.

Where possible, I am in favor of allowing for flexible use with the simplest interface possible.

tnightengale commented 1 year ago

@bcodell Let me know your thoughts on the draft, showing the aggregate_in_before working for counting activities (but not unpacking): https://github.com/tnightengale/dbt-activity-schema/pull/16

I couldn't figure out a way to pass a column expression to the override_columns arg without making some hefty changes to the columns interface. That's because the column name is used by the generate_appended_column_alias macro, which can't handle and parse arbitrary expressions.

The fanout can be solved without modifying dataset.sql using DISTINCT in the aggregation. Perhaps the approach here to to allow the aggregation to be an arg to the aggregation relationships, and thus folks could pass their own custom aggregation macros.

bcodell commented 1 year ago

I couldn't figure out a way to pass a column expression to the override_columns arg without making some hefty changes to the columns interface.

What if the interface didn't require users to pass column expressions? Can you bake an assumption into the code that if the column name passed isn't in the default set of columns from the spec and represented in the columns() registry, then the column must be a key in feature_json? You'd have to add some if/else logic here and here in the dataset macro to parse the feature_json, but then the generate_appended_column_alias macro should just work. If you really wanted to, you could create a column abstraction that returns a namespace with attributes for the column name as defined in the argument, the parsed selection syntax (or unparsed if it's a default column), and the alias.

That said, knowing the data type of the column passed is essential, since data types aren't preserved when they're wrapped up in json objects in databases, otherwise custom aggregation functions may behave oddly. This is partly why, for my project, I built the activity materialization to look for a feature_json (I called it attributes) parameter, where developers would define their custom features and specify their data types (implementation here and example here for reference).

Got a little carried away there. Given your goal to keep this as simple as possible, it's probably best to forgo this functionality for now (at least in this PR). Can always come back to it down the road.

The fanout can be solved without modifying dataset.sql using DISTINCT in the aggregation.

I mentioned it in my PR comments, but I'll reiterate here - I have some hesitation as to how this will scale in larger Activity Streams. And the fanout will generate weird behavior for custom aggregation functions too, right?

Perhaps the approach here to to allow the aggregation to be an arg to the aggregation relationships, and thus folks could pass their own custom aggregation macros.

This is exactly what I had in mind for these types of relationships.

tnightengale commented 1 year ago

@bcodell

What if the interface didn't require users to pass column expressions? Can you bake an assumption into the code that if the column name passed isn't in the default set of columns from the spec and represented in the columns() registry, then the column must be a key in feature_json? You'd have to add some if/else logic here and here in the dataset macro to parse the feature_json, but then the generate_appended_column_alias macro should just work. If you really wanted to, you could create a column abstraction that returns a namespace with attributes for the column name as defined in the argument, the parsed selection syntax (or unparsed if it's a default column), and the alias.

That said, knowing the data type of the column passed is essential, since data types aren't preserved when they're wrapped up in json objects in databases, otherwise custom aggregation functions may behave oddly. This is partly why, for my project, I built the activity materialization to look for a feature_json (I called it attributes) parameter, where developers would define their custom features and specify their data types (implementation here and example here for reference).

I really like this whole idea, and think we should look at implementing after we get to a baseline by providing the aggregation relationships! 👍

I mentioned it in my PR comments, but I'll reiterate here - I have some hesitation as to how this will scale in larger Activity Streams. And the fanout will generate weird behavior for custom aggregation functions too, right?

I believe you are right: I've done a very refactor of dataset.sql to ensure each appended activity is joined and aggregated in it's own CTE, before recombining everything in a final CTE.

tnightengale commented 1 year ago

Closed via #16