Datawheel / canon-stats

Reusable statistical and complexity calculations using NodeJS and Express
1 stars 1 forks source link

Add redis-cache for intermediate queries #23

Open cnavarreteliz opened 3 years ago

cnavarreteliz commented 3 years ago

In canon-stats, in the majority of the cases we're using the same RCA matrix for doing other calculations (Relatedness, ECI, ...). The idea will be to implement redis for storing "intermediate" calculations.

davelandry commented 3 years ago

@cnavarreteliz please consult @jhmullen with this, because he had to do almost this EXACT same thing in Open Source Compass. He used bull for queueing, and redis for storage. It's a private repo owned by Deloitte, so I'm going to copy/paste the good bits here. I have a note on one thing that should be different in the canon-stats implementation:

First, he set up his connection to Redis in a cache file, cache/redis.js:

const redis = require("redis");
const yn = require("yn");

const debug = yn(process.env.CANON_CONST_LOGREDIS);
const host = process.env.CANON_CONST_REDIS_HOST || "127.0.0.1";
const port = process.env.CANON_CONST_REDIS_PORT || 6379;

const client = redis.createClient({host, port});

client.on("connect", () => {
  if (debug) console.log("Redis connected successfully!");
});
client.on("error", e => {
  if (debug) console.log("Redis connection error: ", e);
});

module.exports = function() {
  return client;
};

He then queues for different endpoints/operations, cache/queues.js:

const Queue = require("bull");
const yn = require("yn");

const host = process.env.CANON_CONST_REDIS_HOST || "127.0.0.1";
const port = process.env.CANON_CONST_REDIS_PORT || 6379;

const max = process.env.CANON_CONST_BULL_RATE_MAX || 10;
const duration = process.env.CANON_CONST_BULL_RATE_DURATION || 1000;

const redis = {host, port};
const limiter = {max, duration};

const domainQueue = new Queue("domains", {limiter, redis});
const projectQueue = new Queue("projects", {limiter, redis});
const languageQueue = new Queue("languages", {limiter, redis});
const locationQueue = new Queue("locations", {limiter, redis});

const debug = yn(process.env.CANON_CONST_LOGREDIS);

const queues = [domainQueue, projectQueue, languageQueue, locationQueue];

queues.forEach(queue => {
  queue.on("completed", job => {
    if (debug) console.log(`Job ${job.id} completed in ${queue.name} Queue - Key: ${job.data.key}`);  
  });
});

module.exports = function() {
  return {
    domainQueue,
    projectQueue,
    languageQueue,
    locationQueue
  };
};

He created an express middleware function called checkCache.js:

const yn = require("yn");
const debug = yn(process.env.CANON_CONST_LOGREDIS);

module.exports = function(redis) {
  return (req, res, next) => {
    const key = decodeURI(req.url);
    redis.get(key, (error, result) => {
      if (error) {
        if (debug) console.log("Redis Error!");
        throw error;
      }
      if (result) {
        if (result === "404") {
          if (debug) console.log("cache returned stored 404");
          return res.json({error: 404});
        }
        else {
          if (debug) console.log(`hitting cache for: ${key}`);
          return res.json(JSON.parse(result));
        }
      }
      else {
        if (debug) console.log(`${key} not in cache, moving to bull queue`);
        return next();
      }
    });
  }; 
};

And then hisapi/ endpoints looked like this:

const checkCache = require("../app/helpers/checkCache");
const yn = require("yn");

const debug = yn(process.env.CANON_CONST_LOGREDIS);
const timeout = process.env.CANON_CONST_BULL_TIMEOUT || 30000;

module.exports = function(app) {

  const {redis, queues} = app.settings.cache;
  const queue = queues.projectQueue;

  queue.process(15, async(job, done) => {

    if (debug) console.log("got in job", job.data);

    const returnData = {}; // do all your logic here to create the return object (all the axios request, etc)

    if (redis.connected) redis.set(redisKey, JSON.stringify(returnData));
    done();

  });

  // wrapper route to check redis before making main call
  app.get("/api/profile/project/:owner/:name", checkCache(redis), async(req, res) => {
    // prepare variables for queue
    const origin = `http${ req.connection.encrypted ? "s" : "" }://${ req.headers.host }`;
    const {owner, name} = req.params;
    const key = `/api/profile/project/${owner}/${name}`;
    if (debug) console.log("making a real request now");
    // check if this job is already in the queue
    const jobsPromise = queue.getJobs(["waiting", "active", "delayed"]);
    const jobs = await jobsPromise.then(resp => resp);
    jobs.sort((a, b) => a.id - b.id);
    const jobIndex = jobs.findIndex(j => j.data.key === key);
    // return 503 if the job is already in progress (give it a second!)
    if (jobIndex !== -1) {
      if (debug) console.log("job already exists in queue, exiting");
      res.json({error: 503, place: jobIndex + 1});
    }
    // otherwise, kick off the job and return a 503
    else {
      if (debug) console.log("new job, adding");
      queue.add({owner, name, origin, key}, {timeout});
      res.json({error: 503, place: jobs.length});
    }
  });
};