Open characode opened 1 year ago
Final correction - to help find classes which were not defined and to pause/resume when hitting memory watermark
dataPipeline.on('data', (x) => { ++dataCount; if (dataCount ==2) { // the second record has our core class definitions/schemas setupOutputClasses(x); resolve(); //main } thisType = (x.value["@class"]) if (thisType && !classesToIgnore.includes(thisType)) { newClassName=getNewClassName(x); if (dataCount%10000==0) //console.log("Processed: "+discoveryCounter); process.stdout.write('.'); if (outputClassSummary[newClassName]) { outputClassSummary[newClassName]++; } else { outputClassSummary[newClassName]=1; } if(typeof csvWriters[classMapping[thisType]] === 'undefined') { console.log("CSV Writer not found for: " + newClassName + JSON.stringify(x)); } let writer = csvWriters[classMapping[thisType]].write( getRow(x.value), err => { if (err) { console.error(err); } }); if(!writer) { console.log("csvWriter " + newClassName + ":" + dataCount + " csvWriter failed"); dataPipeline.pause(); csvWriters[classMapping[thisType]].once('drain', function resume(){ dataPipeline.resume(); console.log("csvWriter " + newClassName + ":" + dataCount + " datapipeline resumed"); }); } //evalRowHeaders(x); } })
During large export processing (>9GB), see uncontrolled high memory usage as not waiting for drain, modified the following to address:
let writer = csvWriters[classMapping[thisType]].write( getRow(x.value), err => { if (err) { console.error(err); } }); if(!writer) { console.log("csvWriter " + newClassName + ":" + dataCount + " csvWriter failed"); dataPipeline.pause(); csvWriters[classMapping[thisType]].once('drain', function resume(){ dataPipeline.resume(); console.log("csvWriter " + newClassName + ":" + dataCount + " datapipeline resumed"); }); }