Closed Vadorequest closed 6 years ago
Hi Vadorequest,
the timeout is there just as an example to keep the read consumed capacity in a baseline, ConsumedCapacity should be taken into consideration as well.
var messages = [];
async.waterfall([
function( cb ) {
(function recursive_call( $lastKey ) {
DynamoDB
.table('messages')
.resume($lastKey)
// add limit if needed, depending on the item size
// 4k of data = 1 read, 1Mb of items would consume 250/2 non consistent reads
// .limit()
.scan(function( err, data ) {
if (err)
return cb(err) // if throughput exceeded then do a recursive call
console.log("scan", this.ConsumedCapacity )
messages = messages.concat(data)
if (this.LastEvaluatedKey === null)
return cb()
var $this = this
setTimeout(function() {
recursive_call($this.LastEvaluatedKey);
},1000); // timeout should be based on ConsumedCapacity and allocated throughput
})
})(null);
},
// scan completed, continue with other stuff
function( cb ) {
console.log("messages", messages )
cb()
},
], function(err) {
if (err)
return cb()
console.log("messages", messages )
})
I did not really checked the syntax of the above code :)
Okay thanks @adrianpraja
Here is our implementation, which is working fine.
export function recursiveTableScan({ resolve, reject, tableName, lastKey = null, items = [], ...options }) {
const {
limit = null,
fields = null
} = options;
let query = DynamoDB
.table(tableName)
.resume(lastKey);
if (limit) {
query.limit(limit);
}
if (fields) {
query.select(...fields);
}
query.scan(function (err, data) {
if (!err) {
const { LastEvaluatedKey } = this;
if (!LastEvaluatedKey) {
console.log(this);
}
// concat current data with previous data
const allItems = items.concat(data);
// LastEvaluatedKey is null if we have queried all items in the current table
if (LastEvaluatedKey === null || (limit && allItems.length >= limit)) {
// Query done, prepare and return final results
if(limit && allItems.length > limit){
resolve(allItems.slice(0, limit));
}
resolve(allItems);
return;
}
setTimeout(function () {
recursiveTableScan({ resolve, reject, tableName, lastKey: LastEvaluatedKey, items: allItems, ...options });
}, 1000);
} else {
throw err;
}
});
}
export async function scanAll(tableName, { ...props } = {}) {
return await new Promise(
(resolve, reject) => recursiveTableScan({ resolve, reject, tableName, ...props }),
);
}
Usage:
const parameters = event.queryStringParameters || {};
const {
output = 'json',
limit = null
} = parameters;
const data = await scanAll(process.env.TABLE_NAME_SIMULATIONS, {
limit: limit,
fields: [
'id',
'userAgent',
'date',
'requestInfo.hostname',
'requestInfo.remoteAddress',
'deviceId',
'simulation.values.institution',
'simulation.values.amount',
]
});
Feel free to add it to the doc, I believe it's much easier to get started with this kind of example! ;)
I think select() is expecting item attributes as arguments not as an array;
Try with
this.select.apply( this, fields_array )
or
this.select( field1, field2 )
or
fields_array.map(function( attribute ) {
$this.addSelect( attribute )
})
I do not understand the use of:
if(limit && allItems.length > limit){
resolve(allItems.slice(0, limit));
}
It seems you are using limit both as a per .scan() limit and also as a global limit applied to all items returned
also there is no return after resolve
if(limit && allItems.length > limit){
resolve(allItems.slice(0, limit));
return; // should be a return here too
}
resolve(allItems);
return;
Also
if (!err) {
} else {
// if err code is ProvisionedThroughputExceededException then
// you might wanna add a delay and retry the call
throw err;
}
@adrianpraja
I think select() is expecting item attributes as arguments not as an array;
Exactly, I guess you saw my first version, I edited it to query.select(...fields);
which does exactly that soon after posting it
It seems you are using limit both as a per .scan() limit and also as a global limit applied to all items returned
Exactly, because I'm running a scan
operation and there is no way to limit the results returned by a scan operation (AFAIK) so I basically limit the number of returned results my own way. Clearly not perfect, especially considering there is no particular ordering applied.
And you're right about the missing return, thanks! (doesn't seem to change anything though)
Good thing its working 👍
I'm closing this now 😃
I'm trying to perform a recursive scan, following the documentation at https://awspilot.github.io/dynamodb-oop/pages/scan/ (tab N°2)
Why is there a 1sec timeout? That's huge, what does it do really? How does this example returns the content of the table? Calls are recursive but I don't see any variable holding the dataset nor updating it. I don't understand how it works.
Could the doc provide a better example with an actual use-case? Like recursive scan a table then display the results, using a reusable function, for instance?