dmanjunath / node-redshift

A simple collection of tools to help you get started with Amazon Redshift from node.js
69 stars 48 forks source link

Redshift and latency #12

Closed edtguy closed 7 years ago

edtguy commented 7 years ago

Hi, This is more a question than an issue. Is there anything to configure in node-redshift to control how long it takes for a insert or update to be reflected in the Redshift database? I have an application with database tables in Aurora that I am trying to parallel in Redshift. Most of the tables I bulk load via S3 and Lamda with good response time, but one table, insight, keeps track of daily statistics, which are polled at 15 minute intervals. If no record exists for the date, one is inserted; otherwise, the existing record is updated with the new values. This add/update decision doesn't lend itself to bulk loading. I am posting a simplified version of the function that is invoked every 15 minutes for this table. What I am observing in the log file is that the 'insights:' log statement is recorded in the log right on the 15 minute mark, but the 'queryString:' log statement can appear in the log much later, even skipping several 15 minute periods, then logging for all of those skipped periods at once. And when I check the AWS Redshift console for queries, there is even more latency until they finally appear there. Is there some sort of "flush" option? TIA, Ed `/ jshint node: true / 'use strict'; var Redshift = require('node-redshift'); var util = require('util'); var config = require(__dirname + '/' + 'configuration.json');

var client = {user: config.my_user,database: config.my_db,password: config.my_password,port: 5439,host: config.my_host+'.us-east-1.redshift.amazonaws.com'}; var redshiftClient = new Redshift(client, {rawConnection: false});

function addUpdateRedshiftInsights(insights) { util.log('insights:',insights); _.each(insights,function(i) { redshiftClient.query('select * from insight where insight_date=\''+i.insight_date+'\';', {}, function(err, data) { var queryString; if(err) { util.log('error encountered'); throw err; } else{ if(data.rows[0]) { queryString = 'update insight set impressions='+i.impressions+',impressions_unique='+i.impressions_unique+',impressions_paid='+i.impressions_paid+' where insight_date=\''+i.insight_date+'\';'; } else { queryString = 'insert into insight (insight_date,impressions,impressions_unique,impressions_paid) values (\''+i.insight_date+'\','+i.impressions+','+i.impressions_unique+','+i.impressions_paid+');'; } util.log('queryString:',queryString); redshiftClient.query(queryString,{},function(err,data) { if(err) throw err; }); } }); }); }`

dmanjunath commented 7 years ago

@edtguy Hey there! Two quick questions. How long is the insights array you're passing into the argument of the function? And what's the query time of the select queries look like in the redshift console?

My initial gut feeling for what's happening is if you have multiple insights, the _.each is calling the select query on each of the insights and it's making all those queries simultaneously. Unless you've modified your WLM, redshift is pretty bad past a handful of simultaneous queries.

As far as a flush, if you mean flushing the data to the redshift console in AWS, I don't believe there's a way to do that. I looked and I couldn't find an AWS API for it. That's something I could have used myself.

edtguy commented 7 years ago

Hi Dheeraj, Thanks for the reply. The actual process is retrieving insights in four different categories, so there are four arrays. So far, the return value has always been one element array for the first category, empty array for the second and third category and six element array for the fourth category, so in total, there are seven records to either insert or update each time the process runs, which is every 15 minutes. Today is more than half over, and it has still not completed yesterday's updates.

I constructed a test script with a raw connection, to see if that would be different. I constructed a function that uses the async package, to serialize the open/query/close methods. It took over 16 minutes for 6 inserts. What's odd is that I see these records in the database, using the Aginity workbench, but when I look at the queries in the AWS Redshift console, they are not listed. I am copying both the code and the console output below. I'm beginning to think that Redshift is either not appropriate for this application, or I need a different approach.

Regards, Ed

test script: ` / jshint node: true /

'use strict';

var _ = require('lodash'), util = require('util'); var async = require('async'); var Redshift = require('node-redshift'); var client = {user: 'user',database: 'db',password: 'password',port: 5439,host: 'host.us-east-1.redshift.amazonaws.com'};

function redshiftRawQuery(queryString,callback,qData) { var redshiftClient = new Redshift(client, {rawConnection: true}); async.series([ function(callback) { redshiftClient.connect(function(err) { if(err) throw err; else callback(null,'connected'); }); }, function(callback) { redshiftClient.query(queryString,{},function(err,data) { if(err) throw err; else callback(null,data); }); }, function(callback) { callback(null,qData); }, function(callback) { redshiftClient.close(); callback(null,'closed'); }], callback ); }

var reactionFn = function(err,results) { var queryString; var data = results[1]; var qData = results[2]; if(err) { util.log('error encountered'); throw err; } else{ if(data.rows[0]) { queryString = 'update fb_reaction set value='+qData.value+' where insight_date=\''+qData.insight_date+'\' and type=\''+qData.type+'\';'; queryString=queryString.replace('\'null\'','null'); } else { queryString = 'insert into fb_reaction (insight_date,type,value) values (\''+qData.insight_date+'\',\''+qData.type+'\','+qData.value+');'; queryString=queryString.replace('\'null\'','null'); } util.log('queryString:',queryString); redshiftRawQuery(queryString,redshiftResults,null); } };

var redshiftResults = function(err, results) { if(err) util.log('err:',err); else util.log('results:',results); };

var reactions = [ { insight_date: '2017-04-07', type: 'like', value: 11 }, { insight_date: '2017-04-07', type: 'love', value: 1 }, { insight_date: '2017-04-07', type: 'wow', value: 0 }, { insight_date: '2017-04-07', type: 'haha', value: 0 }, { insight_date: '2017-04-07', type: 'sorry', value: 0 }, { insight_date: '2017-04-07', type: 'anger', value: 0 } ];

util.log('start test');

_.each(reactions,function(i) { redshiftRawQuery('select * from fb_reaction where insight_date=\''+i.insight_date+'\' and type=\''+i.type+'\';',reactionFn,i); });

util.log('end test'); `

Console output: 7 Apr 13:55:55 - start test 7 Apr 13:55:55 - end test 7 Apr 13:55:56 - queryString: insert into fb_reaction (insight_date,type,value) values ('2017-04-07','wow',0); 7 Apr 13:55:56 - queryString: insert into fb_reaction (insight_date,type,value) values ('2017-04-07','love',1); 7 Apr 13:55:56 - queryString: insert into fb_reaction (insight_date,type,value) values ('2017-04-07','anger',0); 7 Apr 13:55:56 - queryString: insert into fb_reaction (insight_date,type,value) values ('2017-04-07','haha',0); 7 Apr 13:55:56 - queryString: insert into fb_reaction (insight_date,type,value) values ('2017-04-07','sorry',0); 7 Apr 13:55:56 - queryString: insert into fb_reaction (insight_date,type,value) values ('2017-04-07','like',11); 7 Apr 14:02:17 - results: [ 'connected',{ command: 'INSERT',rowCount: 1,oid: 0,rows: [],fields: [],_parsers: [],RowCtor: null,rowAsArray: false,_getTypeParser: [Function: bound ] },null,'closed' ] 7 Apr 14:04:55 - results: [ 'connected',{ command: 'INSERT',rowCount: 1,oid: 0,rows: [],fields: [],_parsers: [],RowCtor: null,rowAsArray: false,_getTypeParser: [Function: bound ] },null,'closed' ] 7 Apr 14:06:27 - results: [ 'connected',{ command: 'INSERT',rowCount: 1,oid: 0,rows: [],fields: [],_parsers: [],RowCtor: null,rowAsArray: false,_getTypeParser: [Function: bound ] },null,'closed' ] 7 Apr 14:08:44 - results: [ 'connected',{ command: 'INSERT',rowCount: 1,oid: 0,rows: [],fields: [],_parsers: [],RowCtor: null,rowAsArray: false,_getTypeParser: [Function: bound ] },null,'closed' ] 7 Apr 14:11:06 - results: [ 'connected',{ command: 'INSERT',rowCount: 1,oid: 0,rows: [],fields: [],_parsers: [],RowCtor: null,rowAsArray: false,_getTypeParser: [Function: bound ] },null,'closed' ] 7 Apr 14:12:08 - results: [ 'connected',{ command: 'INSERT',rowCount: 1,oid: 0,rows: [],fields: [],_parsers: [],RowCtor: null,rowAsArray: false,_getTypeParser: [Function: bound ] },null,'closed' ]

dmanjunath commented 7 years ago

My guess is your update queries are taking the bulk of the time. Redshift isn't designed to keep having rows updated like a relational database. It's update is actually really really slow(http://stackoverflow.com/questions/25751363/redshift-update-prohibitively-slow). I had a very similar situation to you where I had to keep the most updated value of a table in redshift like you're doing with the fb_reaction. It was a horrible disaster with UPDATE.

What I ended up doing was testing out two alternatives in redshift. The first was keeping the table in Postgres and updating it there. I would run the query against Postgres and then on an interval export the table from Postgres and /COPY into redshift. The other option(the one I went for) was to have a running log where I just kept inserting to the fb_reaction table as kind of a running log for that time interval and when I did a join to get the fb_reaction data just called SUM() to get the total. The reason I chose this was I needed time based data too, but I actually ended up using option one for a different table later on.

As for Aginity displaying the queries my guess is they're querying the cluster directly for this info and it's returning faster than the AWS console because AWS probably does some processing or something, who knows. I'd try querying that table directly if you haven't already. http://docs.aws.amazon.com/redshift/latest/dg/r_STV_RECENTS.html

On Apr 8, 2017, at 1:15 AM, edtguy notifications@github.com wrote:

Hi Dheeraj, Thanks for the reply. The actual process is retrieving insights in four different categories, so there are four arrays. So far, the return value has always been one element array for the first category, empty array for the second and third category and six element array for the fourth category, so in total, there are seven records to either insert or update each time the process runs, which is every 15 minutes. Today is more than half over, and it has still not completed yesterday's updates.

I constructed a test script with a raw connection, to see if that would be different. I constructed a function that uses the async package, to serialize the open/query/close methods. It took over 16 minutes for 6 inserts. What's odd is that I see these records in the database, using the Aginity workbench, but when I look at the queries in the AWS Redshift console, they are not listed. I am copying both the code and the console output below. I'm beginning to think that Redshift is either not appropriate for this application, or I need a different approach.

Regards, Ed

test script: ` / jshint node: true /

'use strict';

var _ = require('lodash'), util = require('util'); var async = require('async'); var Redshift = require('node-redshift'); var client = {user: 'user',database: 'db',password: 'password',port: 5439,host: 'host.us-east-1.redshift.amazonaws.com'};

function redshiftRawQuery(queryString,callback,qData) { var redshiftClient = new Redshift(client, {rawConnection: true}); async.series([ function(callback) { redshiftClient.connect(function(err) { if(err) throw err; else callback(null,'connected'); }); }, function(callback) { redshiftClient.query(queryString,{},function(err,data) { if(err) throw err; else callback(null,data); }); }, function(callback) { callback(null,qData); }, function(callback) { redshiftClient.close(); callback(null,'closed'); }], callback ); }

var reactionFn = function(err,results) { var queryString; var data = results[1]; var qData = results[2]; if(err) { util.log('error encountered'); throw err; } else{ if(data.rows[0]) { queryString = 'update fb_reaction set value='+qData.value+' where insight_date=''+qData.insight_date+'' and type=''+qData.type+'';'; queryString=queryString.replace(''null'','null'); } else { queryString = 'insert into fb_reaction (insight_date,type,value) values (''+qData.insight_date+'',''+qData.type+'','+qData.value+');'; queryString=queryString.replace(''null'','null'); } util.log('queryString:',queryString); redshiftRawQuery(queryString,redshiftResults,null); } };

var redshiftResults = function(err, results) { if(err) util.log('err:',err); else util.log('results:',results); };

var reactions = [ { insight_date: '2017-04-07', type: 'like', value: 11 }, { insight_date: '2017-04-07', type: 'love', value: 1 }, { insight_date: '2017-04-07', type: 'wow', value: 0 }, { insight_date: '2017-04-07', type: 'haha', value: 0 }, { insight_date: '2017-04-07', type: 'sorry', value: 0 }, { insight_date: '2017-04-07', type: 'anger', value: 0 } ];

util.log('start test');

_.each(reactions,function(i) { redshiftRawQuery('select * from fb_reaction where insight_date=''+i.insight_date+'' and type=''+i.type+'';',reactionFn,i); });

util.log('end test'); `

Console output: 7 Apr 13:55:55 - start test 7 Apr 13:55:55 - end test 7 Apr 13:55:56 - queryString: insert into fb_reaction (insight_date,type,value) values ('2017-04-07','wow',0); 7 Apr 13:55:56 - queryString: insert into fb_reaction (insight_date,type,value) values ('2017-04-07','love',1); 7 Apr 13:55:56 - queryString: insert into fb_reaction (insight_date,type,value) values ('2017-04-07','anger',0); 7 Apr 13:55:56 - queryString: insert into fb_reaction (insight_date,type,value) values ('2017-04-07','haha',0); 7 Apr 13:55:56 - queryString: insert into fb_reaction (insight_date,type,value) values ('2017-04-07','sorry',0); 7 Apr 13:55:56 - queryString: insert into fb_reaction (insight_date,type,value) values ('2017-04-07','like',11); 7 Apr 14:02:17 - results: [ 'connected',{ command: 'INSERT',rowCount: 1,oid: 0,rows: [],fields: [],_parsers: [],RowCtor: null,rowAsArray: false,_getTypeParser: [Function: bound ] },null,'closed' ] 7 Apr 14:04:55 - results: [ 'connected',{ command: 'INSERT',rowCount: 1,oid: 0,rows: [],fields: [],_parsers: [],RowCtor: null,rowAsArray: false,_getTypeParser: [Function: bound ] },null,'closed' ] 7 Apr 14:06:27 - results: [ 'connected',{ command: 'INSERT',rowCount: 1,oid: 0,rows: [],fields: [],_parsers: [],RowCtor: null,rowAsArray: false,_getTypeParser: [Function: bound ] },null,'closed' ] 7 Apr 14:08:44 - results: [ 'connected',{ command: 'INSERT',rowCount: 1,oid: 0,rows: [],fields: [],_parsers: [],RowCtor: null,rowAsArray: false,_getTypeParser: [Function: bound ] },null,'closed' ] 7 Apr 14:11:06 - results: [ 'connected',{ command: 'INSERT',rowCount: 1,oid: 0,rows: [],fields: [],_parsers: [],RowCtor: null,rowAsArray: false,_getTypeParser: [Function: bound ] },null,'closed' ] 7 Apr 14:12:08 - results: [ 'connected',{ command: 'INSERT',rowCount: 1,oid: 0,rows: [],fields: [],_parsers: [],RowCtor: null,rowAsArray: false,_getTypeParser: [Function: bound ] },null,'closed' ]

— You are receiving this because you commented. Reply to this email directly, view it on GitHub, or mute the thread.

edtguy commented 7 years ago

Thanks for the update. I had a similar thought, to continually log information throughout the day, selecting the max value, then perhaps deleting all but the last record of the day in a post-midnight process, if we decide to continue with Redshift.