cube-js / cube

📊 Cube — The Semantic Layer for Building Data Applications
https://cube.dev
Other
17.94k stars 1.78k forks source link

Pre-Aggregations #8191

Open A-Lasso opened 6 months ago

A-Lasso commented 6 months ago

Hello, I'm having some slow querys with dynamic tables so I wanted to add some generic pre-aggs, I'm a little lost on finding good examples and also those I created are not working, and I think is because of the cube.js we have, since we are adding an account_uuid filter to have each database to just have data related to them (we use metabase cloud).

Tools:

Related Cube.js schema

const {memoizedFetchAccountPassword} = require("./sql-auth")

function decodeBase64(data) {
    let buff = Buffer.from(data, 'base64');
    return buff.toString('ascii');
}

const deconstructGlobalId = (globalId) => {
    const decoded = decodeBase64(globalId)
    const globalIdParts = decoded.split(":")

    return {
        node: globalIdParts[0],
        uuid: globalIdParts[1]
    }
}

const contextToAppId = (context) => `CUBEJS_APP_${context.cacct}`

const extendContext = (req) => {
  // For SQL API (context is then handled by checkSqlAuth)
    if (req.headers === undefined || req.headers.cacct === undefined) {
        return;
    }

    try {
        const deconstructed = deconstructGlobalId(req.headers.cacct);
        return {cacct: deconstructed["uuid"]};
    } catch (err) {
        console.log("Error extending context: " + err)
    }
}

const queryRewrite = (query, request) => {
    console.log("Rewriting for", JSON.stringify(request))
    const accountId = request.cacct !== undefined ? request.cacct : request.securityContext.cacct;

    if (query["dimensions"].length > 0) {
        try {
            var table_name = query["dimensions"][0].substring(0, query["dimensions"][0].indexOf("."));
            } catch {
            var table_name = query["dimensions"][0]["cubeName"]
            }   
    }
    // measures in query
    else if (query["measures"].length > 0) {
        try {
            var table_name = query["measures"][0].substring(0, query["measures"][0].indexOf("."))
            } catch {
            var table_name = query["measures"][0]["cubeName"]
            }
    }

    else if (query["timeDimensions"].length > 0) {
        try {
        var table_name = query["timeDimensions"][0]["dimension"].substring(0, query["timeDimensions"][0]["dimension"].indexOf("."))
        } catch {
            var table_name = query["measures"][0]["cubeName"]
        }
    }

    else {
        console.log("Using account id:", accountId)
        console.log("The query:", query)
    }

    console.log("Using account id:", accountId);

    query.filters.push({
        member: `${table_name}.accountUuid`,
        operator: 'equals',
        values: [accountId],
    });

    return query;
}

const checkSqlAuth = async (req, username) => {
    const deconstructed = deconstructGlobalId(username);
    // const password = await memoizedFetchAccountPassword(deconstructed['uuid']);
    const password = "stagingaccess";

    if (password !== null) {
        return {
            password,
            securityContext: {
                cacct: deconstructed['uuid']
            }
        }
    }
}

// To test sql auth locally, uncomment the following line.
// checkSqlAuth({}, "QWNjb3VudDo4NDUyZjg4MS0wYWM1LTRmMjYtOWFhYi0xNmUzN2IyZWM0ZjI=").then((ctx) => console.log("X>", ctx));

const queueOptions = {
  concurrency: 2,
  executionTimeout: 600,
  orphanedTimeout: 120,
  heartBeatInterval: 120,
};

module.exports = {
    http: {
        cors: {
            origin: '*',
            methods: 'GET,HEAD,PUT,PATCH,POST,DELETE',
            preflightContinue: false,
            allowedHeaders: ['Content-Type', 'Authorization', 'cacct'],
            optionsSuccessStatus: 204,
        },
    },
    scheduledRefreshTimer: 120,
    checkSqlAuth,
    contextToAppId,
    extendContext,
    queryRewrite,
    orchestratorOptions: {
        queryCacheOptions: {
            refreshKeyRenewalThreshold: 120,
            backgroundRenew: true,
            queueOptions,
        },
        preAggregationsOptions: {queueOptions},
    },
    // Figure out ScheduledRefreshContexts later! (probably needed for QueryRewrite)
    // Placeholder to prevent the error message:
    scheduledRefreshContexts: () => [
        {
            securityContext: {
                cacct: '00000000-0000-0000-0000-000000000000'
            }
        }
    ],
};

So this is one of the dynamic files we have:

{% set account = "accountUuid" %}
{% set interaction = "interactionUuid" %}
{% set time = "time" %}
cubes:
  {%- for cube in load_interaction_tags()["cubes"] %}

  - name: {{ cube.name }}
    sql: {{ cube.sql }}
    dataSource: default

  {%- if cube.dimensions is not none and cube.dimensions|length > 0 %}
    pre_aggregations:

      - name: {{cube.name + "_table" }} 
        dimensions:
          {%- for dimension in cube.dimensions %} 
          - {{ dimension.name }}
          {%- endfor %}
        refresh_key:
          every: 12 hour

      - name: {{cube.name + "_table2" }} 
        dimensions:
          {%- for dimension in cube.dimensions %} 
          {%- if dimension.name != account and  dimension.name != interaction %}
          - {{ dimension.name }}
          {%- endif %}
          {%- endfor %}
        refresh_key:
          every: 12 hour

      {%- for dimension in cube.dimensions %}

      - name: {{ dimension.name + "_filter" }} 
        dimensions:
          - {{ dimension.name }}
        refresh_key:
          every: 12 hour

      - name: {{ dimension.name + "_by_distinctUuid" }} 
        dimensions:
          - {{ dimension.name }}
        measures:
          - distinctUuid 
        refresh_key:
          every: 12 hour

      {%- if dimension.type == time %}
      - name: {{ dimension.name + "uuid_by_month" }} 
        measures:
          - distinctUuid
        time_dimension: {{ dimension.name }}
        granularity: month

      - name: {{ dimension.name + "uuid_by_day" }} 
        measures:
          - distinctUuid
        time_dimension: {{ dimension.name }}
        granularity: day

      - name: {{ dimension.name + "uuid_by_year" }} 
        measures:
          - distinctUuid
        time_dimension: {{ dimension.name }}
        granularity: year

      {%- endif %}
      {%- endfor %}
  {%- endif %}

  {%- if cube.joins is not none and cube.joins|length > 0 %}
    joins:
      {%- for join in cube.joins %}
      - name: {{ join.name }}
        relationship: {{ join.relationship }}
        sql: {{ join.sql }}
      {%- endfor %}
  {%- endif %}

  {%- if cube.measures is not none and cube.measures|length > 0 %}
    measures:
      {%- for measure in cube.measures %}
      - name: {{ measure.name }}
        type: {{ measure.type }}
      {%- if measure.sql %}
        sql: {{ measure.sql }}
      {%- endif %}
      {%- endfor %}
  {%- endif %}

  {%- if cube.dimensions is not none and cube.dimensions|length > 0 %}
    dimensions:
      {%- for dimension in cube.dimensions %}
      - name: {{ dimension.name }}
        sql: {{ dimension.sql }}
        type: {{ dimension.type }}
        {% if dimension.primaryKey == True -%}
        primaryKey: true
        public: true
        {% endif -%}
      {%- endfor %}
  {%- endif %}
  {%- endfor %}

As you see I already did some pre-aggs for every table, column and some testing ones for date columns, but my problem is first, this is how the table gets to metabase:

image

And this is the query generated in query history:

-- Metabase:: userID: 1 queryType: MBQL queryHash: de78bb40680846bcd73c5909057caa9eda003637ecc1a97df41551608e5e1b9e
SELECT
  "public"."IT_Daily_IPS_CheckIn"."interactionUuid" AS "interactionUuid",
  "public"."IT_Daily_IPS_CheckIn"."accountUuid" AS "accountUuid",
  "public"."IT_Daily_IPS_CheckIn"."Denied" AS "Denied",
  "public"."IT_Daily_IPS_CheckIn"."Duplicates" AS "Duplicates",
  "public"."IT_Daily_IPS_CheckIn"."Emailed_for_Approval" AS "Emailed_for_Approval",
  "public"."IT_Daily_IPS_CheckIn"."Failed_Posts" AS "Failed_Posts",
  "public"."IT_Daily_IPS_CheckIn"."Pending_Approval" AS "Pending_Approval",
  "public"."IT_Daily_IPS_CheckIn"."Questions" AS "Questions",
  "public"."IT_Daily_IPS_CheckIn"."Second_Email_Approval" AS "Second_Email_Approval",
  "public"."IT_Daily_IPS_CheckIn"."__user" AS "__user",
  "public"."IT_Daily_IPS_CheckIn"."__cubeJoinField" AS "__cubeJoinField"
FROM
  "public"."IT_Daily_IPS_CheckIn"
LIMIT
  2000

image And as you see it doesn't detects the pre-aggregation, I don't know if its because the query has user and cubejoin columns in them, or maybe we have too much pre-aggs?

The thing is I don't find how to do a pre-agg with joins and filters, all this dynamic tables are primarily used joined with two other tables. I want to do something like the two first pre-aggs but with the joins to the tables and filtering not null in a different column for each pre-agg (this is likely the most general useful pre-agg I can create).

Always videos and documentation are welcome, I did all this with what I could find but I cannot say I read them deeply, though I tried.

A-Lasso commented 6 months ago

Is it possible that it stayed bugged? I have the dataset created and last modified on 22, but I have been creating more pre-aggregations and deleting them: image image

It also seems like I have less than 100 pre-aggs from like 5-10 tables, but I should have more.

I also this here all the time: image

A-Lasso commented 6 months ago

I disabled this two options and it seems that now the pre-aggs are working when metabase call those querys, I don't know if it was too much for the wam-up, it was like bugged and not creating the new pre-aggregations, now seems like its working.

image

If I have new updates I'll share them.