ak--47 / dwh-mixpanel

🏭 reverse-ETL from your DWH to mixpanel
MIT License
17 stars 0 forks source link

🏭 dwh-mixpanel

Stream queries from your data warehouse to events, profiles, groups, or lookup tables in Mixpanel... rETL style πŸ’«.

No intermediate staging/storage required.

Supported Data Warehouses:

πŸ‘” tldr;

run the module, and provide a configuration file as the first argument:

npx dwh-mixpanel ./myConfig.json

for help building a configuration file, run the module with no arguments:

npx dwh-mixpanel

what next?

🍿 demo

bigquery to mixpanel demo

πŸ‘¨β€πŸ’»οΈ cli

as stated in the tldr, if you run dwh-mixpanel with no arguments you get a CLI which helps you build a configuration file:

npx dwh-mixpanel

it looks like this:

cli walkthrough

at the end of this walkthrough, a JSON file will be saved to your current working directory; this JSON will contain the configuration of what you typed into the CLI. the CLI will then ask you if you'd like to trigger a run:

confirmation screen

once you have a configuration file, you can run (and re-run) that job by passing the configuration file as the first argument to this module:

npx dwh-mixpanel snowflake-mixpanel.json

as it runs, you'll get some console output as to the status of your job:

demo

once the pipeline is complete, it will stash logs in the current working directory, and you can see your data in mixpanel!

note: if you will use this module frequently, consider a global install:

npm install --global dwh-mixpanel

and then you don't need the npx:

dwh-mixpanel ./myConfig.json

πŸ”„ module

dwh-mixpanel can also be used as a ESM module inside any node.js environment. this is useful for automated cloud ingestion pipelines and scheduled syncs with your warehouse.

install it from npm:

npm install dwh-mixpanel

and then you use it as you would any other dependency:

import dwhMp from "dwh-mixpanel";

dwh-mixpanel exports a single function, which takes in a single parameter - a configuration object.

this is the entry-point for the whole module:

const myConfig = {
  dwh: "bigquery",
  sql: "SELECT * FROM EVENTS",
  //etc...
};

const bqToMpSummary = await dwhMp(myConfig);

the module returns a summary of the entire pipeline, with statistics and logs about how many records were processed, throughput, and metadata from the warehouse.

{
  mixpanel: {
    success: 100000,
    failed: 0,
    total: 100000,
    requests: 50,
    retries: 0,    
    responses: [{},{},{}],
    errors: [{},{},{}],
  },
  bigquery: {
    job: {
      //job infos
    },
    metadata: {
      //query metadata
    },
    schema: {
      //schema
    }
  }
}

πŸ‘¨β€πŸ”§οΈ configuration

your configuration is an object (or JSON) with the following structure:

{
    dwh: "",            // warehouse name
    auth: {},           // warehouse auth details
    sql: "",            // a SQL query
    mappings: {},           // col headers β†’ mixpanel fields
    mixpanel: {},           // mixpanel auth
    options: {},            // job options
    tags: {}            // arbitrary tags
}

you can find examples in the repo for different warehouses. additionally, the module is typed using jsdoc, so you should have a good experience using it in your IDE:

developer experience

here's a description of each of those keys (and values) mean

dwh

a string representing the data warehouse you're connecting too.

bigquery, athena, snowflake, azure, salesforce

auth

an object {} containing the service account/credentials to use when authenticating with the data warehouse.

each cloud warehouse has its own method(s) of authenticating API calls, which usually consist of a username and a password, secret, or private key.

to read more about the ways you can authenticate with a supported data warehouse, see warehouse details

sql

a valid SQL Query to run (as a job) in your data warehouse; this is the "model" for the data that will be sent to mixpanel. your SQL query will usually be in the form of a SELECT {fields} or SELECT * statement:

SELECT
  eventName,
  user_id,
  timestamp,
  prop_a,
  prop_b,
  prop_c,
  rowId as insert_id
FROM
  "myProject.myDB.myTable"
WHERE
  env is "prod"

your SQL query should produce a flat, non-nested table that has the fields and records you wish to stream to mixpanel. your column headers can have any title; you will provide a mappings dictionary (detailed below) to describe how mixpanel should receive the fields.

note: most field labels can be retitled post-ingestion using lexicon, mixpanel's data governance suite.

mappings

an object {} containing mapping assignments of COLUMN headers in the warehouse table to JSON property keys in mixpanel's data model.

providing mappings is not a tedious task; mixpanel is a schemaless tool designed for semi-structured data, so any column not explicitly mapped which is present in the table will become an event/user property key and value.

therefore, your SQL query should select all the fields you want in mixpanel, and the only fields you must provide mappings for depend on the required fields for the type of data you're importing:

event mappings:

NOTE: this module supports original id merge AND simplified id merge ... there's an FAQ which explains the differences, but usually simplified id merge is the best solve for rETL.

{
  // REQUIRED
  'event_name_col': '',     // column for event name
  'time_col': '',       // column for event time

  // REQUIRED FOR ORIGINAL ID MERGE
   'distinct_id_col': '',   // column for distinct_id

  // REQUIRED FOR SIMPLIFIED ID MERGE
   'user_id_col': '',       // column for user_id / canonical_id
   'device_id_col': '',     // column for device_id / anon_id

  // OPTIONAL
  'insert_id_col': ''       // column for row id (deduplication)
}

note: insert_id_col is required when using strict mode

user or group profiles mappings:

{

// REQUIRED
 'distinct_id_col': '', // column for uniquer user id

// OPTIONAL
 'profileOperation': '', // the $set style operation to use
 'name_col': '', // column $name to use for the user/group profile
 'email_col': '', // column $email to use for the user/group profile
 'avatar_col': '', // column $email to use for the user/group profile
 'created_col': '', // column $created (timestamp) to use
 'phone_col': '', // column $phone to use for the user/group profile
 'latitude_col': '', // column $latitude to use for the user/group profile
 'longitude_col': '', // column $longitude to use for the user/group profile
 'ip_co': '' // column $ip to use for the user/group profile
}

lookup tables mappings:

{
// REQUIRED
 'lookup_col' : '' //the "join" column for the lookup table
//hint: ^ this is usually the first column in the table
}

the key to remember about mappings is that you a giving the module a guide to understand how to map fields from your warehouse to required fields for the different mixpanel data types.

here's an example:

SELECT
    insert_id, timestamp, action, uuid, theme, class
FROM
    mydnd.campaign.db
which produces this table: insert_id timestamp action uuid theme class
abc-123 4:19 PM attack ak dark cleric
xyz-345 4:20 PM defend alice light bard
cba-678 4:20 PM attack bob light paladin
zyx-901 4:21 PM sneak eve dark rogue

with this mapping:

{
    event_name_col: 'action',
    distinct_id_col: 'uuid',
    time_col: 'timestamp',
    insert_id_col: 'insert_id'
}

then produces these events in mixpanel:

[
  {
    event: "attack",
    properties: {
      distinct_id: "ak",
      time: 1234567890,
      $insert_id: "abc-123",
      theme: "dark",
      class: "cleric",
    },
  },
  {
    event: "defend",
    properties: {
      distinct_id: "alice",
      time: 1234577891,
      $insert_id: "xyz-345",
      theme: "light",
      class: "bard",
    },
  },
  //etc...
];

for more info on mixpanel's data structure, see this deep-dive

mixpanel

an object {} containing authentication details used to connect to your mixpanel project.

{
    project_id: '',             //your mixpanel project id
    type: 'event',              //type of record to import
    region: 'US',               //or EU

    //one of service details OR api secret is required
    service_account: '',            //service account user name
    service_secret: '',             //service account secret

    api_secret: '',             //project api secret [deprecated]

    //required for profiles
    token: '',              //mixpanel project token

    //required for groups
    groupKey: '',               //the group key for this group

    //required for lookup tables
    lookupTableId: ''           //the lookup table to replace
}

note: you can find most of these values in the your mixpanel project's settings

options

an object {} containing various options for the job.

{
    logFile: 'myLog.txt', // local path to write log files to
    verbose: true,  // display verbose console output
    strict: false, // use strict mode when sending data to mixpanel
    compress: false,  // gzip data before egress
    workers: 20 // number of concurrent workers to make requests to mixpanel
}

the workers option is important because it governs concurrency, which can greatly affect throughput. best results are observed between 10-20 workers.

tags (optional)

an object {} containing arbitrary key:value string pairs that will be used to tag the data. this is particularly useful if this module is being used as part of an automated pipeline, and you wish to tag the data with runIds or some other reference value.

{
    mixpanel: {
        type: "event"
    },
    tags: {
        "foo": "bar"
        // every event in mixpanel will have a {foo: 'bar'} prop
    }
}

this works on all record types:

{
    mixpanel: {
        type: "user"
    },
    tags: {
        "baz": "qux"
        // every user profile updated will have a {baz: 'qux'} prop
    }
}

πŸ“ warehouse details

the data warehouse connectors used by this module are implemented as middleware, and therefore they have authentication strategies with different field requirements.

in most cases, dwh-mixpanel wraps the vendor SDKs of each warehouse with it's own API, so when passing auth params in your configuration, you can use any values that are supported by your warehouse, provided those credentials have the appropriate permissions.

below are details for authentication strategies and permissions required for in each supported warehouse. if you find an auth method or strategy that you need and is not supported for your warehouse, please file an issue

BigQuery

most bigquery jobs will be authenticated with GCP service accounts

the service account will need the following permissions in bigquery AND on the specific dataset being queried:

in my experience, the data viewer + bigquery job user roles set together satisfies these cases; if a required permission is missing, the output will tell you what it is.

the typical fields used for auth are project_id, private_key, and client_email; location will need to be added manually based on the region of your bigquery instance:

{
    dwh: "bigquery",
    auth : {
            "project_id": "my-gcp-project", //GCP project
            "client_email": "serviceAccount@email.com", //service acct email
            "private_key": "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----\n", // service account private key
            "location": "US" //bigquery location.. this is required!
        }
}

in most cases, you can drop your exported JSON keys into the auth param, and it will work. here's a video where i go through the service account setup, start to finish.

ADC Authentication

if you are running dwh-mixpanel from your local, and you do not have IAM access in GCP to create service accounts, but you do have user-level access to the datasets in BigQuery, you can use Application Default Credentials (ADC) which leverage your local GCP account to authenticate with bigQuery's APIs.

the general steps here are:

gcloud init
gcloud auth application-default login
{
    "dwh": "bigquery",
    "auth": {},
    "sql": "SELECT * FROM FOO"
}
npx dwh-mixpanel ./bigquery-mixpanel
Snowflake

snowflake jobs with authenticate with a user name + password. you may also use key pair auth

the fields used for auth are:

most of these values can be found in the UI or the SQL console.

{
    dwh: "snowflake",
    auth : {
            // user + pass auth
            "username": "",
            "password": "",

            // key pair auth
            "username": "",
            "privateKey": "./path/to/rsa_key.p8"
            "passphrase": "my-passphrase" // for encrypted keys only

            // always required
            "account": "foobar.us-central1.gcp", // your snowflake identifier           
            "database": "PROD1", // database to use
            "schema": "PUBLIC", // schema to use
            "warehouse": "COMPUTE_WH" //warehouse to use
        }
}

no special permissions are required for snowflake - only that the user entered can view and query the dataset.

note: 2FA or SSO auth for Snowflake is not currently supported in this module.

Athena

to query athena from this module, your user account (or service account) will need permission to take the following actions in athena:

since athena depends on S3, your account will also need access to the following actions in S3:

the S3 bucket you assign permissions to should be the same one athena uses for storage; you can see this in the athena UI:

athena storage location

note: all queries made to athena are stored as CSV files in S3; this module uses the DeleteObject action to delete the materialized CSV after the data is imported into mixpanel.

most AWS accounts can be setup for programmatic access using an accessKeyId and a secretAccessKey; you'll also need to add the region of your S3 instance.

{
    dwh: "athena",
    auth : {
            "accessKeyId": "",
            "secretAccessKey": "",
            "region": "us-east-2" //note this is important!
        }
}
Azure

Azure SQL (managed + on premise) servers employ usernames and passwords for authentication; there are no special permissions required... the user you authenticate as should have permissions to run the query.

there are two common patterns for entering credentials, connection strings and JSON ... they are essentially the same thing in different formats:

Server=tcp:my-sql-server.database.windows.net,1433;Database=database;User Id=username;Password=password;Encrypt=true
Driver=msnodesqlv8;Server=(local)\INSTANCE;Database=database;UID=DOMAIN\username;PWD=password;Encrypt=true

you can input your connection string into the auth object with the key connection_string:

{
    dwh: "azure",
    auth: {
        connection_string: "my-sql-server.database.windows.net,1433; etc..."
    }
}

if your database is hosted in Azure Cloud, you can find your connection strings in the Azure SQL UI; this module uses the ADO.NET syntax:

azure cloud screenshot

make sure to choose the right connection string version that is supported by your database. (hint: not all Azure DBs are setup with Active Directory)

if you wish, you may also pass your credentials as JSON; the parameters are very similar to what's encoded in the connection string. they look like this:

{
    dwh: "azure",
    auth: {
        user: "",
        password: "",
        server: "",
        port: 1433, //default
        domain: "",
        database: ""
    }
}

you can also pass other pool configuration options to the auth object... see the full list of params

salesforce

while salesforce may not typically be thought of as a "data warehouse" (great for transactions; not so great for analysis), it can be queried with SQL. salesforce uses SOQL, a variant of SQL for sObjects.

therefore, you can model events, profiles, groups or lookup tables as SOQL queries, and pipe them directly to mixpanel!

the data model for salesforce is quite unique, and therefore this module works differently when interfacing with salesforce than it does with other warehouses.

for authentication you need only add a username and password for the auth object:

{
    dwh: "salesforce",
    auth: {
        user: "", //your username
        password: "", // your password + security token
        version: "51.0" // API version to use; 51 is default
    }
}

dwh-mixpanel uses salesforce's SOAP login API for authentication... this means your password is your normal salesforce password with a concatenated security token. if your UI password was: foo and your security token was bar-bazqux your API password is foobar-bazqux

if you do not have a security token, here are the steps to reset it

this module provides additional options which govern the transformation of salesforce records into usable mixpanel entities; these options are all turned on by default and can be changed in the auth section of the configuration file if desired:

{
    dwh: "salesforce",
    auth: {
        user, 
        password,
        "resolve_field_names": true, // "Renewal_Date__c" β†’ "Contract Renewal Date"
        "rename_primary_id" : true, // "Id" β†’ "Account.Id"
        "add_sfdc_links": true // add salesforce URLs to all records
    }
}
profiles + tables

for user profiles, group profiles, and lookup tables, you must provide column mappings to mixpanel objects. relationship fields using the dot . syntax allow you to traverse objects, and are easy to reference:

for example:

SELECT
    Account.Id,
    Account.Name,
    Account.Owner.Name, -- relationships
    Account.CSM__c.Name,
    Account.ARR__c,
    Account.NPS__c,
    Account.plan_type__c.monthly_spend__c -- custom fields
FROM
    Account

would (likely) use the following mappings:

{
    dwh: "salesforce",
    mappings: {
        distinct_id_col: "Account.Id",
        name_col: "Account.Name",
        email_col: "Account.Owner.Name",
    },
    mixpanel: {
        type : "group",
        groupKey: "Account.Id" //important!
    }
}

this will produce group profiles for every account, using each account's Id as the $group_id

in some instances it can be tedious to list every single field that you may wish to include on a salesforce object. salesforce does not support SELECT * queries, but if you want to bring over all flattened properties from an object, dwh-mixpanel does support this syntax.

by example, here is a query + mappings which will create user profiles for all of your salesforce end users and bring in every field on the User object in salesforce:

SELECT * FROM User

and the corresponding mappings:

 "mappings": {
        "distinct_id_col": "Id",
        "name_col": "Name",
        "email_col": "Email",        
        "profileOperation": "$set"
    },

please use this feature with care as it does not traverse relationship fields

events: field history

modeling events from salesforce in mixpanel is a bit different; in most cases, you will want to model field history objects as events in mixpanel. these objects contain the "change tables" that describe the lifecycle of a salesforce object... when an object is created, or when it's field value change (i.e. an Opportunity changing Stage), it's field history is updated..

to properly use this module, every SOQL query modeling history MUST contain the fields:

Field, NewValue, OldValue, CreatedDate

these fields are used internally to determine the event name, event time, and $insert_id therefore, any mappings supplied to event_name_col, time_col, or insert_id_col will be ignored.

you will need to supply a mapping for distinct_id_col (usually the Id of the primary object being queried)

for example, we might query the OpportunityFieldHistory to get events for each "field change" on each opportunity:

SELECT
    Id, DataType, Field, NewValue, OldValue, CreatedDate,
    Opportunity.Id, Opportunity.Name, Opportunity.Amount,
    Opportunity.Owner.Name, Opportunity.Account.Name
FROM
    OpportunityFieldHistory

and we might use the following mapping:

{
    dwh: "salesforce",
    mappings: {
        event_name_col: "", // ignored!
        insert_id_col: "", // ignored!
        time_col: "",   // ignored!
        distinct_id_col: "Opportunity.Id", //important!

    },
    mixpanel: {
        type : "event"
    }
}

this would model all field changes to any opportunities as events in mixpanel, using the opportunity's Id as the distinct_id in mixpanel.

events: no history

you can also model events that do not contain history tables; keep in mind that you'll need to choose a time_col mapping, (usually CreatedDate or LastModifiedDate) which determine when the event occurred.

in these cases, you are effectively modeling and event which describes when a particular salesforce object was created or modified, but keep in mind that your field values always contain the most current version of that object's properties, not the values at the time the object was created.

this is still quote useful, since some many salesforce objects are created once and never modified; for example, we might model new Tasks or Calls as the following:

-- modeling Tasks object as events
SELECT
    Id, Who.Id, Who.Name, Who.RecordType.SobjectType,
    What.Id, What.Name, What.RecordType.SobjectType,
    AccountId, CreatedDate, CreatedBy.Id, CreatedBy.Name
FROM
    Task
WHERE
    LastModifiedDate = Today

we need supply the time_col, insert_id_col, and distinct_id_col:

{
    dwh: "salesforce",
    mappings: {
        event_name_col: "", // optional!
        insert_id_col: "Id", // important!
        time_col: "CreatedDate", //important!
        distinct_id_col: "Who.Id", //important!

    },
    mixpanel: {
        type : "event"
    }
}

the event_name_col is optional; if it is supplied, that will be a hardcoded event name in mixpanel:

{
    mappings: {
        event_name_col: "task was created!"
    }
}

would produce:

{
    event: "task was created!",
    properties: {}
}

if event_name_col is not supplied, event names will be created automatically based on the sObject that is being queried (in this case task).

note: nested SOQL subqueries are not currently supported for salesforce rETLs.

πŸ’Ύ environment variables

if you would prefer to store your authentication details as environment variables or in an .env file, you may do so.

this module will find those values provided they are correctly named.

here is a sample of how environment variables can be used for mixpanel:

MP_SERVICE_ACCOUNT=myServiceAcct
MP_SERVICE_SECRET=myServiceSecret
MP_API_SECRET=myAPISecret
MP_TOKEN=myToken
MP_LOOKUP_TABLE=myLookupTableId

for the warehouse auth, use the key DWH_AUTH and for the value use stringified JSON that you would pass to auth in the configuration file:

DWH_AUTH='{
        "project_id": "ak-internal-tool-1613096051700",
        "private_key": "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----\n",
        "client_email": "mySerAcct@iam.google.com",
        "location": "US"
    }'

that's it for now. have fun!