ak--47 / mixpanel-import

๐Ÿšฟ stream data to mixpanel, quickly.
30 stars 1 forks source link

mixpanel-import

๐Ÿคจ wat.

stream data to mixpanel... quickly

stream events, users, and groups into mixpanel

mixpanel-import implements Mixpanel's /import, /engage, /groups, and /lookup APIs with best practices, providing a clean, configurable interface to stream JSON, NDJSON, or CSV files compliant with Mixpanel's data model through Mixpanel's ingestion pipeline.

by implementing interfaces as streams in node.js, high-throughput backfills are possible with no intermediate storage and a low memory footprint.

note: if you're trying to add real-time mixpanel tracking to a node.js web application - this module is NOT what you want; you want mixpanel-node the official node.js SDK.

๐Ÿ‘” tldr;

this module can be used in two ways:

npx mixpanel-import file --options
//for esm:
import mpStream from 'mixpanel-import'
//for cjs:
const mpStream = require('mixpanel-import')

const myImportedData = await mpSteam(creds, data, options)

๐Ÿ’ป CLI usage

npx --yes mixpanel-import@latest ./pathToData

when running as a CLI, pathToData can be a .json, .jsonl, .ndjson, .csv or .txt file OR a directory which contains said files.

when using the CLI, you will supply params to specify options of the form --option value, for example your project credentials:

npx --yes mixpanel-import ./data.ndjson --secret abc123

many other options are available; to see a full list of CLI params, use the --help option:

npx --yes mixpanel-import --help

alternatively, you may use an .env configuration file to provide your project credentials (and some other values).

the CLI will write response logs to a ./logs directory by default. you can specify a --where dir option as well if you prefer to put logs elsewhere.

๐Ÿ”Œ module usage

install mixpanel-import as a dependency in your project

npm i mixpanel-import --save

then use it in code:

const mpStream = require("mixpanel-import");
const importedData = await mpStream(credentials, data, options);

console.log(importedData);
/*

{
    success: 5003,
    failed: 0,
    total: 5003,
    batches: 3,
    rps: 3,
    eps: 5000,
    recordType: "event",
    duration: 1.299,
    retries: 0,
    responses: [ ... ],
    errors: [ ... ]
}

*/

read more about credentials, data, and options below

๐Ÿ—ฃ๏ธ arguments

when using mixpanel-import in code, you will pass in 3 arguments: credentials, data, and options

๐Ÿ” credentials

Mixpanel's ingestion APIs authenticate with service accounts OR API secrets; service accounts are the preferred authentication method.

๐Ÿค– service account:

const creds = {
  acct: `my-service-acct`, //service acct username
  pass: `my-service-secret`, //service acct secret
  project: `my-project-id`, //project id
};
const importedData = await mpStream(creds, data, options);

๐Ÿ™Š API secret:

const creds = {
  secret: `my-api-secret`, //api secret (deprecated auth)
};
const importedData = await mpStream(creds, data, options);

๐Ÿ“ profiles + tables:

if you are importing user profiles, group profiles, or lookup tables, you should also provide also provide the you project token and some other values in your creds configuration:

const creds = {
        token: `my-project-token`, //for user/group profiles
        groupKey: `my-group-key`, //for group profiles
        lookupTableId: `my-lookup-table-id`, //for lookup tables
    }

๐Ÿค– environment variables:

it is possible to delegate the authentication details to environment variables, using a .env file under the MP_ prefix of the form:

# if using service account auth; these 3 values are required:
MP_PROJECT={{your-mp-project}}
MP_ACCT={{your-service-acct}}
MP_PASS={{your-service-pass}}

# if using secret based auth; only this value is required
MP_SECRET={{your-api-secret}}

# type of records to import; valid options are event, user, group or table
MP_TYPE=event

# required for user profiles + group profiles
MP_TOKEN={{your-mp-token}}

# required for group profiles
MP_GROUP_KEY={{your-group-key}}

# required for lookup tables
MP_TABLE_ID={{your-lookup-id}}

note: pass null (or {}) as the creds to the module to use .env variables for authentication:

const importedData = await mpStream(null, data, options);

๐Ÿ“ˆ data

the data param represents the data you wish to import; this might be events, user profiles, group profiles, or lookup tables

the value of data can be:

const data = `./myEventsToImport.json`;
const importedData = await mpStream(creds, data, options);
const data = `./myEventsToImport/`; //has json files
const importedData = await mpStream(creds, data, options);
const data = [`./file1.jsonl`, `./file2.jsonl`] ; //has json files
const importedData = await mpStream(creds, data, options);
const data = [{event: "foo"}, {event: "bar"}, {event: "baz"}]
const importedData = await mpStream(creds, data, options);
const records = [{event: "foo"}, {event: "bar"}, {event: "baz"}]
const data = JSON.stringify(data);
const importedData = await mpStream(creds, data, options);
const myStream = fs.createReadStream("./myData/lines.json");
const imported = await mpStream(creds, myStream, { streamFormat: `json` });

note: please specify streamFormat as json or jsonl in the options

const { createMpStream } = require('mixpanel-import');
const mixpanelStream = createMpStream(creds, options, (results) => { ... })

const myStream = new Readable.from(data, { objectMode: true });
const myOtherStream = new PassThrough()

myOtherStream.on('data', (response) => { ... });

myStream.pipe(mixpanelStream).pipe(myOtherStream)

note: object mode streams use a different named import: createMpStream() ... the callback receives a summary of the import and downstream consumers of the stream will receives API responses from Mixpanel.

you will use the options (below) to specify what type of records you are importing; event is the default type

๐ŸŽ› options

options is an object that allows you to configure the behavior of this module. there are LOTS of options for different types of import use cases. you can specify options as the third argument in module mode or as flags in CLI mode.

module options

all options are... optional... for a full list of what these do, see the type definition

export type Options = {
    recordType?: RecordType;
    vendor?: "amplitude" | "heap" | "mixpanel" | "ga4" | "adobe" | "pendo" | "mparticle"
    region?: Regions;
    streamFormat?: SupportedFormats;
    compress?: boolean;
    compressionLevel?: 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9;
    strict?: boolean;
    logs?: boolean;
    verbose?: boolean;
    fixData?: boolean;
    removeNulls?: boolean;
    abridged?: boolean;
    forceStream?: boolean;
    streamSize?: number;
    timeOffset?: number;
    recordsPerBatch?: number;
    bytesPerBatch?: number;
    maxRetries?: number;
    workers?: number;
    where?: string;
    transformFunc?: transFunc;
    parseErrorHandler?: transFunc;
    tags?: genericObj;
    aliases?: genericObj;
    epochStart?: number;
    epochEnd?: number;
    dedupe?: boolean;
    eventWhitelist?: string[];
    eventBlacklist?: string[];
    propKeyWhitelist?: string[];
    propKeyBlacklist?: string[];
    propValWhitelist?: string[];
    propValBlacklist?: string[];
    start?: string;
    end?: string;
};
cli options

use npx mixpanel-import --help to see the full list.

option, alias           description     default
----------------------------------------------------------------
  --type, --recordType      event/user/group/table  "event"
  --compress, --gzip        gzip on egress              false
  --strict                  /import strict mode         true
  --logs                    log import results to file  true
  --verbose                 show progress bar           true
  --streamFormat, --format  either json or jsonl        "jsonl"
  --region                  either US or EU             "US"
  --fixData                 fix common mistakes         false
  --streamSize              2^n value of highWaterMark  27
  --recordsPerBatch         # records in each request   2000
  --bytesPerBatch           max size of each request    2MB
  --where                   directory to put logs

note: the recordType param is very important; by default this module assumes you wish to import event records.

added in 2.5.20: you can now specify certain vendor's in the options like amplitude or ga4 and mixpanel-import will provide the correct transform on the source data to bring it into mixpanel.

change this value to user, group, or table if you are importing other entities.

๐Ÿ‘จโ€๐Ÿณ๏ธ recipes

the transformFunc is useful because it can pre-process records in the pipeline using arbitrary javascript.

here are some examples:

function addToken(user) {
  user.token = `{{my token}}`;
  return user;
}

const imported = await mpStream(creds, data, {
  transformFunc: addToken,
  recordType: "user",
});
const md5 = require('md5')

function addInsert(event) {
    const hash = md5(event);
    event.properties.$insert_id = hash;
    return event
}

const imported = await mpStream(creds, data, { transformFunc: addInsert })
function fixProfiles(user) {
  const mpUser = { $set: { ...user } };
  mpUser.$set.$distinct_id = user.uuid;
  return mpUser
}

const imported = await mpStream(creds, data, { transformFunc: fixProfiles, recordType: "user"});
function onlyProps(event) {
    if (!event.properties) return {}; //don't send events without props
    return event;
}
const data = [{ event: "foo" }, {event: "bar"}, {event: "baz", properties: {}}]
const imported = await mpStream(creds, data, { transformFunc: onlyProps }); //imports only one event
const data = [{ event: false }, {event: "foo"}]

// turns "false" event into 100 events
function exploder(o) => {
    if (!o.event) {
        const results = [];
        const template = { event: "explode!" };
        for (var i = 0; i < 100; i++) {
            results.push(template);
        }
        return results;
    }
    return o;
};

const imported = await mpStream(creds, data, { transformFunc: exploder }) //imports 101 events
const eventsCSV = './myEvents.csv'
/*
myEvents.csv looks like this:
row_id,uuid,timestamp,action,colorTheme,luckyNumber
a50b0a01b9df43e74707afb679132452aee00a1f,7e1dd089-8773-5fc9-a3bc-37ba5f186ffe,2023-05-15 09:57:44,button_click,yellow,43
09735b6f19fe5ee7be5cd5df59836e7165021374,7e1dd089-8773-5fc9-a3bc-37ba5f186ffe,2023-06-13 12:11:12,button_click,orange,7
*/
const imported = await mpStream(creds, eventsCSV, {
    streamFormat: "csv",
    aliases: {
            row_id: "$insert_id",
            uuid: "distinct_id",
            action: "event",
            timestamp: "time"
        }
    }
);

โš—๏ธ test data

sometimes it's helpful to generate test data, so this module includes a separate utility to do that:

$ npm run generate

someTestData.json will be written to ./testData ... so you can then node index.js ./testData/someTestData.json

๐Ÿคท why?

because... i needed this and it didn't exist... so i made it.

then i made it public it because i thought it would be useful to others. then it was, so i made some improvements.

found a bug? have an idea?

let me know