moleculerjs / moleculer

:rocket: Progressive microservices framework for Node.js
https://moleculer.services/
MIT License
6.11k stars 580 forks source link

API Load Testing results application crash | Apache Benchmark #1275

Closed akshaykarode closed 5 months ago

akshaykarode commented 6 months ago

Current Behavior

I have a Service name "OceanFreight". This service accepts Bulk Data and call's "Queue" service to create background jobs using bee-queue. And respond caller immediately with InProgress status. While doing Load Testing by Apache Benchmark, after certain threshold moleculer app crashes with failure.

Expected Behavior

On Load Testing by Apache Benchmark, it should able to digest the load.

Failure Information

Succesfull Test : No of Requests: 25 Concurrency: 5 Payload Size of Each Request: 30K records (~15 MB)

Failed Test : No of Requests: 50 Concurrency: 10 Payload Size of Each Request: 30K records (~15 MB)

Context

Body Parser Settings:

bodyParsers: {
        json: {
                strict: false,
                    limit: "250MB"
        },
        urlencoded: {
                extended: true,
                    limit: "1MB"
        }
},

Failure Logs

[2024-02-22T05:33:39.100Z] INFO  akshays-mbp.lan-26811/API                                        : <= 200 POST /api/v1/ocean-freight/platform/emission/add [+1.061 s]
setBatchJobData: 186 createAndSave: 363.654ms
[2024-02-22T05:33:39.123Z] INFO  akshays-mbp.lan-26811/V1.BEE-REDIS-QUEUE                         : setBatchJobData: 186 Done.
[2024-02-22T05:33:39.123Z] INFO  akshays-mbp.lan-26811/V1.BEE-REDIS-QUEUE                         : syncQueueJobData: 186
setBatchJobData: 188 createAndSave: 314.729ms
[2024-02-22T05:33:39.143Z] INFO  akshays-mbp.lan-26811/V1.BEE-REDIS-QUEUE                         : setBatchJobData: 188 Done.
[2024-02-22T05:33:39.143Z] INFO  akshays-mbp.lan-26811/V1.BEE-REDIS-QUEUE                         : syncQueueJobData: 188
[2024-02-22T05:33:39.144Z] INFO  akshays-mbp.lan-26811/V1.BEE-REDIS-QUEUE                         : syncQueueJobData: 186 Done.
[2024-02-22T05:33:39.144Z] INFO  akshays-mbp.lan-26811/V1.OCEAN-FREIGHT                           : platformAddOceanFreightEmissions, Job Sent to Queue.
[2024-02-22T05:33:39.144Z] INFO  akshays-mbp.lan-26811/API                                        : <= 200 POST /api/v1/ocean-freight/platform/emission/add [+1.117 s]
[2024-02-22T05:33:39.145Z] INFO  akshays-mbp.lan-26811/V1.BEE-REDIS-QUEUE                         : syncQueueJobData: 188 Done.
[2024-02-22T05:33:39.145Z] INFO  akshays-mbp.lan-26811/V1.OCEAN-FREIGHT                           : platformAddOceanFreightEmissions, Job Sent to Queue.
[2024-02-22T05:33:39.145Z] INFO  akshays-mbp.lan-26811/API                                        : <= 200 POST /api/v1/ocean-freight/platform/emission/add [+1.107 s]
[2024-02-22T05:33:39.147Z] ERROR akshays-mbp.lan-26811/API                                        :    Request error! MoleculerError : request aborted 
 MoleculerError: request aborted
    at Service.<anonymous> (/Users/akshay/Desktop/GovEVA/moleculer-goveva/node_modules/moleculer-web/src/utils.js:98:20)
    at next (/Users/akshay/Desktop/GovEVA/moleculer-goveva/node_modules/moleculer-web/src/utils.js:59:18)
    at next (/Users/akshay/Desktop/GovEVA/moleculer-goveva/node_modules/moleculer-web/src/utils.js:70:6)
    at /Users/akshay/Desktop/GovEVA/moleculer-goveva/node_modules/moleculer-web/src/utils.js:73:41
    at onfinished (/Users/akshay/Desktop/GovEVA/moleculer-goveva/node_modules/body-parser/lib/read.js:102:9)
    at dump (/Users/akshay/Desktop/GovEVA/moleculer-goveva/node_modules/body-parser/lib/read.js:200:5)
    at /Users/akshay/Desktop/GovEVA/moleculer-goveva/node_modules/body-parser/lib/read.js:101:7
    at AsyncResource.runInAsyncScope (node:async_hooks:204:9)
    at invokeCallback (/Users/akshay/Desktop/GovEVA/moleculer-goveva/node_modules/raw-body/index.js:231:16)
    at done (/Users/akshay/Desktop/GovEVA/moleculer-goveva/node_modules/raw-body/index.js:220:7)
    at IncomingMessage.onAborted (/Users/akshay/Desktop/GovEVA/moleculer-goveva/node_modules/raw-body/index.js:238:5)
    at IncomingMessage.emit (node:events:513:28)
    at IncomingMessage._destroy (node:_http_incoming:224:10)
    at _destroy (node:internal/streams/destroy:109:10)
    at IncomingMessage.destroy (node:internal/streams/destroy:71:5)
    at abortIncoming (node:_http_server:754:9) 
Data: undefined
[2024-02-22T05:33:39.148Z] INFO  akshays-mbp.lan-26811/API                                        : <= 500 POST /api/v1/ocean-freight/platform/emission/add [+1.058 s]

/Users/akshay/Desktop/GovEVA/moleculer-goveva/node_modules/@moleculer/lab/dist/index.js:1
"use strict";var require$$0$2=require("socket.io"),require$$1$1=require("kleur"),require$$2$1=require("crypto"),require$$0$1=require("lodash"),require$$2$2=require("zlib"),require$$3=require("util"),require$$0=require("moleculer"),require$$1=require("sqlite3"),require$$2=require("path"),require$$4$1=require("fs");function _interopDefaultLegacy(e){return e&&"object"==typeof e&&"default"in e?e:{default:e}}var require$$0__default$2=_interopDefaultLegacy(require$$0$2),require$$1__default$1=_interopDefaultLegacy(require$$1$1),require$$2__default$1=_interopDefaultLegacy(require$$2$1),require$$0__default$1=_interopDefaultLegacy(require$$0$1),require$$2__default$2=_interopDefaultLegacy(require$$2$2),require$$3__default=_interopDefaultLegacy(require$$3),require$$0__default=_interopDefaultLegacy(require$$0),require$$1__default=_interopDefaultLegacy(require$$1),require$$2__default=_interopDefaultLegacy(require$$2),require$$4__default=_interopDefaultLegacy(require$$4$1),require$$4={name:"@moleculer/lab",version:"0.6.4",main:"dist/index.js",scripts:{dev:'node --inspect="0.0.0.0:9229" node_modules/moleculer/bin/moleculer-runner.js --repl --hot --config example/moleculer.config.js example/services/agent.service.js example/services/api.service.js example/services/greeter.service.js example/services/helper.service.js',"dev:agent":'node --inspect="0.0.0.0:9229" node_modules/moleculer/bin/moleculer-runner.js --repl --hot --config example/moleculer.config.js example/services/agent.service.js',"dev:full":'node --inspect="0.0.0.0:9229" node_modules/moleculer/bin/moleculer-runner.js --repl --hot --config example/moleculer.config.js example/services/**.service.js',"dev:brutal":'node --inspect="0.0.0.0:9229" example/brutal.js',build:"rollup -c",deps:"npm-check -u",prepublishOnly:"npm run build",release:"npm publish --access public"},author:"MoleculerJS",dependencies:{kleur:"^4.1.5",lodash:"^4.17.21","socket.io":"^2.4.1",sqlite3:"^5.1.2"},devDependencies:{"@rollup/plugin-commonjs":"^20.0.0","@rollup/plugin-json":"^4.1.0",axios:"^0.21.3",eslint:"^7.32.0","eslint-config-prettier":"^8.5.0","eslint-plugin-node":"^11.1.0","eslint-plugin-prettier":"^4.2.1","eslint-plugin-promise":"^5.1.0","eslint-plugin-security":"^1.5.0",fakerator:"^0.3.6",ioredis:"^4.27.9",moleculer:"^0.14.24","moleculer-db":"^0.8.19","moleculer-repl":"^0.7.3","moleculer-web":"^0.10.4",nats:"^2.8.0","npm-check":"^5.9.2",prettier:"^2.7.1",rollup:"^2.56.3","rollup-plugin-terser":"^7.0.2"}};const _$7=require$$0__default$1.default,sqlite3=require$$1__default.default,path=require$$2__default.default,{makeDirs:makeDirs}=require$$0__default.default.Utils,fs=require$$4__default.default.promises;var abstractStore=class{constructor(service,opts){this.opts=_$7.defaultsDeep(opts,{persistent:!1,folder:null}),this.service=service,this.broker=this.service.broker,this.logger=service.logger,this.Promise=service.Promise,this.name=null,this.db=null,this.cleanTimer=null,this.queue=[],this.queueTimer=null,this.processing=!1}getDBName(){throw new Error("Abstract method")}createTablesSQL(){}async connect(project){this.project=project;const dbName=this.getDBName(project);let filename,db,shouldCreateTables=!1;if(this.opts.persistent){filename=path.join(this.opts.folder||".",dbName+".db"),this.logger.debug(`[${this.name}] Database file:`,filename),makeDirs(path.dirname(filename));try{await fs.access(filename)}catch(err){shouldCreateTables=!0}}if(this.opts.persistent||(shouldCreateTables=!0),this.logger.debug(`[${this.name}] Connecting to '${dbName}' DB...`),await new Promise(((resolve,reject)=>{db=new sqlite3.Database(this.opts.persistent?filename:":memory:",(err=>{if(err)return reject(err);resolve()}))})),this.logger.debug(`[${this.name}] Connected to '${dbName}' DB successfully.`),shouldCreateTables){this.logger.debug(`[${this.name}] Creating tables...`);const sql=this.createTablesSQL();sql&&await new Promise(((resolve,reject)=>{db.exec(sql,(err=>{if(err)return reject(err);resolve()}))}))}this.db=db,db.on("error",(err=>{this.logger.error("DB error",err)})),this.cleanTimer&&clearInterval(this.cleanTimer),this.cleanTimer=setInterval((()=>this.cleanup()),6e4),this.queueTimer&&clearInterval(this.queueTimer),this.queueTimer=setInterval((()=>this.queueTick()),1e3),await this.ready()}async ready(){}async close(){if(this.queueTimer&&clearInterval(this.queueTimer),this.cleanTimer&&clearInterval(this.cleanTimer),this.db)return this.logger.debug(`[${this.name}] Closing DB...`),new Promise((resolve=>{this.db.close((err=>{err&&this.logger.warn("Unable to close DB",err),this.db=null,resolve()}))}))}async getOne(sql,params){if(this.db)return new Promise(((resolve,reject)=>{this.db.get(sql,params,((err,row)=>{if(err)return reject(err);resolve(row)}))}))}async all(sql,params){if(this.db)return new Promise(((resolve,reject)=>{this.db.all(sql,params,((err,rows)=>{if(err)return reject(err);resolve(rows)}))}))}async run(sql,params){if(this.db)return new Promise(((resolve,reject)=>{this.db.run(sql,params,(function(err){if(err)return reject(err);resolve(this.changes)}))}))}prepare(sql){const stmt=this.db.prepare(sql);return{stmt:stmt,finalize:()=>new Promise(((resolve,reject)=>{stmt.finalize((function(err){if(err)return reject(err);resolve()}))})),run:params=>new Promise(((resolve,reject)=>{stmt.run(params,(function(err){if(err)return reject(err);resolve(this.changes)}))})),getOne:params=>new Promise(((resolve,reject)=>{stmt.get(params,(function(err,row){if(err)return reject(err);resolve(row)}))})),all:params=>new Promise(((resolve,reject)=>{stmt.all(params,(function(err,rows){if(err)return reject(err);resolve(rows)}))}))}}cleanup(){}async queueTick(){if(this.db&&!this.processing&&0!=this.queue.length){this.processing=!0;try{const items=this.queue.splice(0,100);this.logger.debug(`[${this.name}] Processing ${items.length} from queue (remaining: ${this.queue.length})...`),await this.Promise.mapSeries(items,(async item=>{try{await this.process(item)}catch(err){this.logger.error("Unable to process item from queue",err)}}))}finally{this.processing=!1}}}addToQueue(item){this.logger.debug(`[${this.name}] Add item to the queue (queue length: ${this.queue.length})`),this.queue.push(item)}async process(){}};const _$6=require$$0__default$1.default,AbstractStore$2=abstractStore;var metricsStore=class extends AbstractStore$2{constructor(service,opts){super(service,opts),this.opts=_$6.defaultsDeep(this.opts,{metrics:{retentionMins:30,timeWindow:10}}),this.name="MetricStore",this.statements={}}getDBName(){return"metrics"}createTablesSQL(){return"\n\t\t\tCREATE TABLE meta (\n\t\t\t\tkey TEXT PRIMARY KEY,\n\t\t\t\tvalue TEXT\n\t\t\t);\n\t\t\tINSERT INTO meta (key, value) VALUES ('version', '1');\n\n\t\t\tCREATE TABLE metrics (\n\t\t\t\tname TEXT PRIMARY KEY,\n\t\t\t\tjson TEXT NOT NULL\n\t\t\t);\n\n\t\t\tCREATE TABLE current (\n\t\t\t\tmetric TEXT NOT NULL,\n\t\t\t\tnodeID TEXT NOT NULL,\n\t\t\t\tkey TEXT NOT NULL,\n\t\t\t\ttimestamp INTEGER NOT NULL,\n\n\t\t\t\tservice TEXT,\n\t\t\t\taction TEXT,\n\t\t\t\tevent TEXT,\n\t\t\t\t'group' TEXT,\n\t\t\t\tcaller TEXT,\n\n\t\t\t\tlabels TEXT,\n\n\t\t\t\tvalue REAL,\n\t\t\t\trate REAL,\n\t\t\t\tcount INTEGER,\n\t\t\t\tsum REAL,\n\t\t\t\tmin REAL,\n\t\t\t\tmean REAL,\n\t\t\t\tmax REAL,\n\t\t\t\tq50 REAL,\n\t\t\t\tq90 REAL,\n\t\t\t\tq95 REAL,\n\t\t\t\tq99 REAL,\n\n\t\t\t\tPRIMARY KEY (metric, nodeID, key)\n\t\t\t);\n\n\t\t\tCREATE INDEX current_idx_nodeID ON current(nodeID);\n\t\t\tCREATE INDEX current_idx_metric_nodeID ON current(metric, nodeID);\n\t\t\tCREATE INDEX current_idx_timestamp ON current(timestamp, nodeID);\n\n\t\t\tCREATE TABLE history (\n\t\t\t\tid INTEGER PRIMARY KEY AUTOINCREMENT,\n\t\t\t\tmetric TEXT NOT NULL,\n\t\t\t\tkey TEXT NOT NULL,\n\t\t\t\tnodeID TEXT NOT NULL,\n\t\t\t\ttimestamp INTEGER NOT NULL,\n\n\t\t\t\tservice TEXT,\n\t\t\t\taction TEXT,\n\t\t\t\tevent TEXT,\n\t\t\t\t'group' TEXT,\n\t\t\t\tcaller TEXT,\n\n\t\t\t\tlabels TEXT,\n\n\t\t\t\tvalue REAL,\n\t\t\t\trate REAL,\n\t\t\t\tcount INTEGER,\n\t\t\t\tsum REAL,\n\t\t\t\tmin REAL,\n\t\t\t\tmean REAL,\n\t\t\t\tmax REAL,\n\t\t\t\tq50 REAL,\n\t\t\t\tq90 REAL,\n\t\t\t\tq95 REAL,\n\t\t\t\tq99 REAL\n\t\t\t);\n\n\t\t\tCREATE INDEX history_idx_metric_timestamp ON history(metric, timestamp);\n\t\t\tCREATE INDEX history_idx_timestamp ON history(timestamp);\n\t\t"}ready(){this.statements.addMetric=this.prepare("\n\t\t\tINSERT INTO metrics\n\t\t\t\t(name, json)\n\t\t\tVALUES\n\t\t\t\t($name, $json)\n\t\t"),this.statements.addGaugeHistory=this.prepare("\n\t\t\tINSERT INTO history\n\t\t\t\t(metric, key, nodeID, timestamp, service, action, event, 'group', caller, labels, value, rate)\n\t\t\tVALUES\n\t\t\t\t($metric, $key, $nodeID, $timestamp, $service, $action, $event, $group, $caller, $labels, $value, $rate)\n\t\t"),this.statements.addHistogramHistory=this.prepare("\n\t\t\tINSERT INTO history\n\t\t\t\t(metric, key, nodeID, timestamp, service, action, event, 'group', caller, labels, rate, count, sum, min, mean, max, q50, q90, q95, q99)\n\t\t\tVALUES\n\t\t\t\t($metric, $key, $nodeID, $timestamp, $service, $action, $event, $group, $caller, $labels, $rate, $count, $sum, $min, $mean, $max, $q50, $q90, $q95, $q99)\n\t\t"),this.statements.upsertGaugeCurrent=this.prepare("\n\t\t\tINSERT INTO current\n\t\t\t\t(metric, key, nodeID, timestamp, service, action, event, 'group', caller, labels, value, rate)\n\t\t\tVALUES\n\t\t\t\t($metric, $key, $nodeID, $timestamp, $service, $action, $event, $group, $caller, $labels, $value, $rate)\n\t\t\t\tON CONFLICT(metric, nodeID, key) DO\n\t\t\tUPDATE SET\n\t\t\t\ttimestamp = $timestamp, value = $value, rate = $rate\n\t\t"),this.statements.upsertHistogramCurrent=this.prepare("\n\t\t\tINSERT INTO current\n\t\t\t\t(metric, key, nodeID, timestamp, service, action, event, 'group', caller, labels, rate, count, sum, min, mean, max, q50, q90, q95, q99)\n\t\t\tVALUES\n\t\t\t\t($metric, $key, $nodeID, $timestamp, $service, $action, $event, $group, $caller, $labels, $rate, $count, $sum, $min, $mean, $max, $q50, $q90, $q95, $q99)\n\t\t\tON CONFLICT(metric, nodeID, key) DO\n\t\t\tUPDATE SET\n\t\t\t\ttimestamp = $timestamp,\n\t\t\t\trate = $rate,\n\t\t\t\tcount = $count,\n\t\t\t\tmin = $min,\n\t\t\t\tmean = $mean,\n\t\t\t\tmax = $max,\n\t\t\t\tq50 = $q50,\n\t\t\t\tq90 = $q90,\n\t\t\t\tq95 = $q95,\n\t\t\t\tq99 = $q99\n\t\t"),this.statements.selectMetricByName=this.prepare("SELECT json FROM metrics WHERE name = ?"),this.statements.selectLastValueByNameAndNodeID=this.prepare("SELECT * FROM current WHERE metric = ? AND nodeID = ? ORDER BY timestamp DESC LIMIT 1")}async close(){try{await Promise.all(Object.values(this.statements).map((stmt=>stmt.finalize())))}catch(err){this.logger.debug(`[${this.name}] Unable to finalize statements`)}await super.close()}roundToTimeWindow(ts,window=this.opts.metrics.timeWindow){const d=1e3*window;return Math.floor(ts/d)*d}async process({nodeID:nodeID,rows:rows}){if(!this.db)return;const startTime=Date.now();this.logger.debug(`[${this.name}] Processing item from '${nodeID}' node...`),await this.run("BEGIN TRANSACTION");try{const registeredMetricNames=(await this.all("SELECT name FROM metrics")).map((row=>row.name));await this.Promise.mapSeries(rows,(async row=>{try{const name=row.name;-1==registeredMetricNames.indexOf(name)&&await this.statements.addMetric.run({$name:name,$json:JSON.stringify(_$6.omit(row,["values"]))}),await Promise.mapSeries(Array.from(row.values.values()),(async valueRow=>{const ts=this.roundToTimeWindow(valueRow.timestamp),params={$metric:name,$key:valueRow.key,$nodeID:nodeID,$timestamp:ts,$service:valueRow.labels.service,$action:valueRow.labels.action,$caller:valueRow.labels.caller,$event:valueRow.labels.event,$group:valueRow.labels.group,$labels:JSON.stringify(valueRow.labels)};"histogram"==row.type?(params.$rate=valueRow.rate,params.$count=valueRow.count,params.$sum=valueRow.sum,params.$min=valueRow.min,params.$mean=valueRow.mean,params.$max=valueRow.max,valueRow.quantiles&&(params.$q50=valueRow.quantiles[.5],params.$q90=valueRow.quantiles[.9],params.$q95=valueRow.quantiles[.95],params.$q99=valueRow.quantiles[.99]),await this.statements.addHistogramHistory.run(params),await this.statements.upsertHistogramCurrent.run(params)):(params.$value=valueRow.value,params.$rate=valueRow.rate,await this.statements.addGaugeHistory.run(params),await this.statements.upsertGaugeCurrent.run(params))}))}catch(err){this.logger.warn(`[${this.name}] Unable to save metric`,row,err)}})),await this.run("COMMIT")}catch(err){await this.run("ROLLBACK"),this.logger.error(`[${this.name}] Rollback transaction`,err)}this.logger.debug(`[${this.name}] Processed ${rows.length} metrics in ${Date.now()-startTime}ms. Queue size: ${this.queue.length}. NodeID: ${nodeID}`),this.emitChanged()}async cleanup(){if(!this.db)return;const startTime=Date.now(),limitTs=Date.now()-60*this.opts.metrics.retentionMins*1e3;this.logger.debug(`[${this.name}] Cleaning up database...`,{limitTs:limitTs});try{const deletedHistory=await this.run("DELETE FROM history WHERE timestamp < ?",[limitTs]),allNodeIDs=await this.all("SELECT DISTINCT nodeID FROM current"),availableNodes=this.service.broker.registry.nodes.toArray().filter((node=>node.available)).map((node=>node.id)),removableNodeIDs=allNodeIDs.map((row=>row.nodeID)).filter((nodeID=>-1==availableNodes.indexOf(nodeID)));let deletedCurrent=0;removableNodeIDs.length>0&&(deletedCurrent=await this.run(`\n\t\t\t\t\tDELETE FROM\n\t\t\t\t\t\tcurrent\n\t\t\t\t\tWHERE\n\t\t\t\t\t\t\ttimestamp < ${limitTs}\n\t\t\t\t\t\tAND\n\t\t\t\t\t\t\tnodeID IN (${removableNodeIDs.map((s=>"'"+s+"'")).join(",")})\n\t\t\t\t`,[])),this.logger.debug(`[${this.name}] Cleanup: Removed ${deletedHistory} old metrics history and ${deletedCurrent} old metrics current value. Time: ${Date.now()-startTime}ms`),(deletedHistory>0||deletedCurrent>0)&&this.emitChanged()}catch(err){this.logger.error(`[${this.name}] Error occured during cleaning database`,err)}}async getMetricByName(name){if(!this.db)return null;const startTime=Date.now(),row=await this.statements.selectMetricByName.getOne([name]);if(row&&row.json)try{const res=JSON.parse(row.json);return this.logger.debug(`[${this.name}] 'getMetricByName' executed in ${Date.now()-startTime}ms`),res}catch(err){}return null}async getLastValue({metric:metric,nodeID:nodeID,prop:prop="value"}){if(!this.db)return;const startTime=Date.now(),row=await this.statements.selectLastValueByNameAndNodeID.getOne([metric,nodeID]);return row?(this.logger.debug(`[${this.name}] 'getLastValue' executed in ${Date.now()-startTime}ms`),{metric:row.metric,nodeID:row.nodeID,timestamp:row.timestamp,value:row[prop]}):null}async aggregate({metric:metric,groupBy:groupBy,prop:prop="value",aggFn:aggFn="sum",maxAge:maxAge}){const startTime=Date.now();if(!await this.getMetricByName(metric))return;let groupByFields,where="metric = ?",params=[metric];if(null!=maxAge){const minTS=this.roundToTimeWindow(Date.now()-60*maxAge*1e3);where+=" AND timestamp >= ?",params.push(minTS)}groupBy&&(groupByFields=(groupBy=Array.isArray(groupBy)?groupBy:[groupBy]).map((s=>"`"+s+"`")).join(", "));const sql=`\n\t\t\tSELECT\n\t\t\t\t${aggFn}(${prop}) as value\n\t\t\t\t${groupByFields?", "+groupByFields:""}\n\t\t\tFROM current\n\t\t\t${where?"WHERE "+where:""}\n\t\t\t${groupByFields?"GROUP BY "+groupByFields:""}\n\t\t`;this.logger.debug(`[${this.name}] Aggregate SQL`,{sql:sql,params:params});const rows=await this.all(sql,params);return this.logger.debug(`[${this.name}] 'aggregate' executed in ${Date.now()-startTime}ms`),rows}async aggregateHistory({metric:metric,nodeID:nodeID,labels:labels,prop:prop,count:count,aggFn:aggFn="sum"}){const startTime=Date.now();if(!await this.getMetricByName(metric))return;const minTS=count?this.roundToTimeWindow(Date.now()-count*this.opts.metrics.timeWindow*1e3):null;let where="metric = ? AND timestamp >= ?",params=[metric,minTS];null!=nodeID&&(where+=" AND nodeID = ?",params.push(nodeID)),labels&&Array.from(Object.entries(labels)).forEach((([key,value])=>{where+=" AND `"+key+"` = ?",params.push(value)}));const sql=`\n\t\tSELECT\n\t\t\ttimestamp,\n\t\t\t${aggFn}(${prop}) as value\n\t\tFROM history\n\t\t${where?"WHERE "+where:""}\n\t\tGROUP BY timestamp\n\t\t`;this.logger.debug(`[${this.name}] AggregateHistory SQL:`,{sql:sql,params:params});const rows=await this.all(sql,params);return this.logger.debug(`[${this.name}] 'aggregateHistory' executed in ${Date.now()-startTime}ms`),rows}emitChanged(){}};const _$5=require$$0__default$1.default,AbstractStore$1=abstractStore;var traceStore=class extends AbstractStore$1{constructor(service,opts){super(service,opts),this.opts=_$5.defaultsDeep(this.opts,{tracing:{retentionMins:30}}),this.name="TraceStore",this.statements={}}getDBName(){return"traces"}createTablesSQL(){return"\n\t\t\tCREATE TABLE meta (\n\t\t\t\tkey TEXT PRIMARY KEY,\n\t\t\t\tvalue TEXT\n\t\t\t);\n\t\t\tINSERT INTO meta (key, value) VALUES ('version', '1');\n\n\t\t\tCREATE TABLE traces (\n\t\t\t\tid TEXT PRIMARY KEY,\n\t\t\t\tname TEXT,\n\t\t\t\tstartTime INTEGER,\n\t\t\t\tmainSpan TEXT,\n\t\t\t\tspanCount INTEGER,\n\t\t\t\tduration REAL,\n\t\t\t\tdepth INTEGER,\n\t\t\t\tservices TEXT,\n\t\t\t\terror TEXT\n\t\t\t);\n\t\t\tCREATE INDEX traces_idx_startTime ON traces(startTime);\n\t\t\tCREATE INDEX traces_idx_name ON traces(name);\n\n\t\t\tCREATE TABLE spans (\n\t\t\t\tid TEXT PRIMARY KEY,\n\t\t\t\ttraceID TEXT NOT NULL,\n\t\t\t\tname TEXT,\n\t\t\t\tstartTime INTEGER,\n\t\t\t\terror INTEGER,\n\t\t\t\tjson TEXT\n\t\t\t);\n\n\t\t\tCREATE INDEX spans_idx_startTime ON spans(startTime);\n\t\t\tCREATE INDEX spans_idx_traceID ON spans(traceID);\n\t\t"}ready(){this.statements.insertTrace=this.prepare("\n\t\t\tINSERT INTO traces\n\t\t\t\t(id, name, startTime, mainSpan, spanCount, duration, depth, services, error)\n\t\t\tVALUES\n\t\t\t\t($id, $name, $startTime, $mainSpan, $spanCount, $duration, $depth, $services, $error)\n\t\t"),this.statements.updateTrace=this.prepare("\n\t\t\tUPDATE traces\n\t\t\tSET\n\t\t\t\tname = $name,\n\t\t\t\tstartTime = $startTime,\n\t\t\t\tmainSpan = $mainSpan,\n\t\t\t\tspanCount = $spanCount,\n\t\t\t\tduration = $duration,\n\t\t\t\tdepth = $depth,\n\t\t\t\tservices = $services,\n\t\t\t\terror = $error\n\t\t\tWHERE\n\t\t\t\tid = $id\n\t\t"),this.statements.insertSpan=this.prepare("\n\t\t\tINSERT INTO spans\n\t\t\t\t(id, traceID, name, startTime, error, json)\n\t\t\tVALUES\n\t\t\t\t($id, $traceID, $name, $startTime, $error, $json)\n\t\t"),this.statements.getLastTraces=this.prepare("SELECT * FROM traces ORDER BY startTime DESC LIMIT ?"),this.statements.selectTraceByID=this.prepare("SELECT * FROM traces WHERE id = ?"),this.statements.countAllTraces=this.prepare("SELECT count(id) as count FROM traces"),this.statements.selectSpansByTraceID=this.prepare("SELECT * FROM spans WHERE traceID = ?")}async close(){try{await Promise.all(Object.values(this.statements).map((stmt=>stmt.finalize())))}catch(err){this.logger.debug(`[${this.name}] Unable to finalize statements`)}await super.close()}async process(spans){if(!this.db)return;const startTime=Date.now();this.logger.debug(`[${this.name}] Processing item...`),await this.run("BEGIN TRANSACTION");try{await Promise.mapSeries(spans,(async span=>{try{const traceID=span.traceID,isMainSpan=null==span.parentID,serviceName=span.service?span.service.fullName||span.service.name:null,depth=span.tags&&null!=span.tags.callingLevel?span.tags.callingLevel:null;let trace=await this.statements.selectTraceByID.getOne([traceID]);if(trace){const services=JSON.parse(trace.services);services[serviceName]=(services[serviceName]||0)+1;const params={$id:traceID,$name:isMainSpan?span.name:trace.name,$startTime:isMainSpan?span.startTime:trace.startTime,$mainSpan:isMainSpan?span.id:trace.mainSpan,$spanCount:trace.spanCount+1,$duration:isMainSpan?span.duration:trace.duration,$depth:Math.max(depth,trace.depth),$services:JSON.stringify(services),$error:isMainSpan&&span.error?span.error.name:trace.error};trace=await this.statements.updateTrace.run(params)}else{this.logger.debug(`[${this.name}] Create new trace '${traceID}'.`);const params={$id:traceID,$name:isMainSpan?span.name:null,$startTime:isMainSpan?span.startTime:null,$mainSpan:isMainSpan?span.id:null,$spanCount:1,$duration:isMainSpan?span.duration:null,$depth:depth,$services:JSON.stringify({[serviceName]:1}),$error:isMainSpan&&span.error?span.error.name:null};trace=await this.statements.insertTrace.run(params)}await this.statements.insertSpan.run({$id:span.id,$traceID:span.traceID,$name:span.name,$startTime:span.startTime,$error:!!span.error,$json:JSON.stringify(span)})}catch(err){this.logger.warn(`[${this.name}] Unable to save tracing span`,span,err)}})),await this.run("COMMIT")}catch(err){await this.run("ROLLBACK"),this.logger.error(`[${this.name}] Rollback transaction`,err)}this.logger.debug(`[${this.name}] Processed ${spans.length} traces in ${Date.now()-startTime}ms. Queue size: ${this.queue.length}.`),this.emitChanged()}async cleanup(){if(!this.db)return;const startTime=Date.now(),limitTs=Date.now()-60*this.opts.tracing.retentionMins*1e3;this.logger.debug(`[${this.name}] Cleaning up database...`,{limitTs:limitTs});try{const deletedTraces=await this.run("DELETE FROM traces WHERE startTime < ?",[limitTs]),deletedSpans=await this.run("DELETE FROM spans WHERE startTime < ?",[limitTs]);this.logger.debug(`[${this.name}] Cleanup: Removed ${deletedTraces} old traces and ${deletedSpans} old spans. Time: ${Date.now()-startTime}ms`),(deletedTraces>0||deletedSpans>0)&&this.emitChanged()}catch(err){this.logger.error(`[${this.name}] Error occured during cleaning database`,err)}}async getLastTraces(params){const startTime=Date.now(),count=params.count||100;let res;return res=params.search?await this.all(`SELECT * FROM traces WHERE name LIKE '%${params.search}%' OR id LIKE '%${params.search}%' ORDER BY startTime DESC LIMIT ${count}`):await this.statements.getLastTraces.all([count]),this.logger.debug(`[${this.name}] 'getLastTraces' executed in ${Date.now()-startTime}ms`),res}async getTraceDetails(id){const startTime=Date.now(),trace=await this.statements.selectTraceByID.getOne([id]);if(trace){const spans=await this.statements.selectSpansByTraceID.all([id]);spans&&(trace.spans=spans.map((span=>JSON.parse(span.json))))}return this.logger.debug(`[${this.name}] 'getTraceDetails' executed in ${Date.now()-startTime}ms`),trace}async countAllTraces(){const startTime=Date.now(),res=await this.statements.countAllTraces.getOne();return res?(this.logger.debug(`[${this.name}] 'countAllTraces' executed in ${Date.now()-startTime}ms`),res.count):null}emitChanged(){}};const _$4=require$$0__default$1.default,AbstractStore=abstractStore;var logStore=class extends AbstractStore{constructor(service,opts){super(service,opts),this.opts=_$4.defaultsDeep(this.opts,{logging:{retentionMins:30}}),this.name="LogStore",this.statements={}}getDBName(){return"logs"}createTablesSQL(){return"\n\t\t\tCREATE TABLE meta (\n\t\t\t\tkey TEXT PRIMARY KEY,\n\t\t\t\tvalue TEXT\n\t\t\t);\n\t\t\tINSERT INTO meta (key, value) VALUES ('version', '1');\n\n\t\t\tCREATE TABLE entries (\n\t\t\t\tid INTEGER PRIMARY KEY AUTOINCREMENT,\n\t\t\t\ttimestamp INTEGER,\n\t\t\t\tnodeID TEXT,\n\t\t\t\tmodule TEXT,\n\t\t\t\tlevel TEXT,\n\t\t\t\tmessage TEXT,\n\t\t\t\targs TEXT\n\t\t\t);\n\t\t\tCREATE INDEX entries_idx_timestamp ON entries(timestamp);\n\t\t\tCREATE INDEX entries_idx_nodeID ON entries(nodeID);\n\t\t"}ready(){this.statements.insertEntry=this.prepare("\n\t\t\tINSERT INTO entries\n\t\t\t\t(timestamp, nodeID, module, level, message, args)\n\t\t\tVALUES\n\t\t\t\t($timestamp, $nodeID, $module, $level, $message, $args)\n\t\t")}async close(){try{await Promise.all(Object.values(this.statements).map((stmt=>stmt.finalize())))}catch(err){this.logger.debug(`[${this.name}] Unable to finalize statements`)}await super.close()}async process(entries){if(!this.db)return;const startTime=Date.now();this.logger.debug(`[${this.name}] Processing item...`),await this.run("BEGIN TRANSACTION");try{await Promise.mapSeries(entries,(async entry=>{try{await this.statements.insertEntry.run({$timestamp:entry.ts,$nodeID:entry.nodeID,$module:entry.mod,$level:entry.level,$message:entry.message,$args:null})}catch(err){this.logger.warn(`[${this.name}] Unable to save log entry`,entry,err)}})),await this.run("COMMIT")}catch(err){await this.run("ROLLBACK"),this.logger.error(`[${this.name}] Rollback transaction`,err)}this.logger.debug(`[${this.name}] Processed ${entries.length} entries in ${Date.now()-startTime}ms. Queue size: ${this.queue.length}.`),this.emitChanged()}async cleanup(){if(!this.db)return;const startTime=Date.now(),limitTs=Date.now()-60*this.opts.logging.retentionMins*1e3;this.logger.debug(`[${this.name}] Cleaning up database...`,{limitTs:limitTs});try{const deletedEntries=await this.run("DELETE FROM entries WHERE timestamp < ?",[limitTs]);this.logger.debug(`[${this.name}] Cleanup: Removed ${deletedEntries} old entries. Time: ${Date.now()-startTime}ms`),deletedEntries>0&&this.emitChanged()}catch(err){this.logger.error(`[${this.name}] Error occured during cleaning database`,err)}}async getLastEntries(params){const startTime=Date.now(),count=params.count||100,lastID=params.lastID;let wheres=[];params.search&&wheres.push(`(\n\t\t\t\tmessage LIKE '%${params.search}%'\n\t\t\t OR nodeID = '${params.search}'\n\t\t\t OR module = '${params.search.toLowerCase()}'\n\t\t\t OR level = '${params.search.toLowerCase()}'\n\t\t\t)`),null!=lastID&&wheres.push(`id > ${lastID}`);let sql="SELECT * FROM entries";wheres.length>0&&(sql+=" WHERE "+wheres.join(" AND ")),sql+=` ORDER BY timestamp DESC LIMIT ${count}`;const res=await this.all(sql);return this.logger.debug(`[${this.name}] 'getLastEntries' executed in ${Date.now()-startTime}ms`),res}emitChanged(){}};const IO=require$$0__default$2.default,kleur=require$$1__default$1.default,crypto=require$$2__default$1.default,_$3=require$$0__default$1.default,pkg=require$$4,zlib$3=require$$2__default$2.default,{promisify:promisify$3}=require$$3__default.default,{safetyObject:safetyObject$1}=require$$0__default.default.Utils,MetricStore=metricsStore,TraceStore=traceStore,LogStore=logStore,deflate$3=promisify$3(zlib$3.deflate),inflate=promisify$3(zlib$3.inflate);let generatedToken=!1;var service={name:"$lab",metadata:{$category:"lab",$description:"Laboratory Agent service",$official:!0,$package:{name:pkg.name,version:pkg.version,repo:null}},settings:{$secureSettings:["token","apiKey"],name:"Moleculer Project",port:process.env.LAB_PORT>0?process.env.LAB_PORT:3210,token:process.env.LAB_TOKEN,apiKey:process.env.LAB_APIKEY},events:{"$services.changed":{tracing:!1,handler(){this.sendRegistryUpdatedMessage()}},"$node.connected":{tracing:!1,handler(){this.sendRegistryUpdatedMessage()}},"$node.updated":{tracing:!1,handler(){this.sendRegistryUpdatedMessage()}},"$node.disconnected":{tracing:!1,handler(){this.sendRegistryUpdatedMessage()}},"$lab.metrics.changes":{context:!0,tracing:!1,async handler(ctx){if(!this.metricStore)return;this.hasMetrics||(this.hasMetrics=!0,this.projectFeaturesChanged());const nodeID=ctx.nodeID;if(!this.nodeIDList.has(nodeID))try{const metrics=await ctx.call("$node.metrics",null,{nodeID:nodeID});return this.nodeIDList.add(nodeID),void this.metricStore.addToQueue({nodeID:nodeID,rows:metrics})}catch(err){this.logger.warn(`Unable to collect metrics from node '${nodeID}'`)}const data=await this.unpack(ctx.params);data&&(this.metricStore.addToQueue({nodeID:ctx.nodeID,rows:data}),this.logger.debug(`Metrics info received from '${ctx.nodeID}'.`))}},"$lab.tracing.spans":{context:!0,tracing:!1,async handler(ctx){if(!this.traceStore)return;this.hasTracing||(this.hasTracing=!0,this.projectFeaturesChanged());const data=await this.unpack(ctx.params);data&&data.length>0&&(this.traceStore.addToQueue(data),this.logger.debug(`Tracing info received from '${ctx.nodeID}'. Spans: ${data.length}`))}},"$lab.log.entries":{context:!0,tracing:!1,async handler(ctx){if(!this.logStore)return;this.hasLogging||(this.hasLogging=!0,this.projectFeaturesChanged());const data=await this.unpack(ctx.params);data&&data.length>0&&(this.logStore.addToQueue(data),this.logger.debug(`Log entries received from '${ctx.nodeID}'. Size: ${data.length}`))}}},methods:{async pack(data){const json=JSON.stringify(data),res=Buffer.from(await deflate$3(json));return json&&this.logger.debug("Packing data. JSON:",json.length," Packed:",res.length," Rate:",Number(res.length/json.length*100).toFixed(2)+"%"),res},unpack:async data=>(_$3.isObject(data)&&"Buffer"==data.type&&(data=Buffer.from(data.data)),data instanceof ArrayBuffer||Buffer.isBuffer(data)?JSON.parse(await inflate(data)):data),projectFeaturesChanged(){return this.logger.debug("Project features has been changed. Sending to frontend..."),this.sendProjectInfo()},async sendProjectInfo(client){const payload={name:this.settings.name,apiKey:this.settings.apiKey,version:this.settings.version,agentVersion:pkg.version,moleculerVersion:this.broker.MOLECULER_VERSION,protocolVersion:this.broker.PROTOCOL_VERSION,namespace:this.broker.namespace,features:{metrics:this.hasMetrics,tracing:this.hasTracing,logging:this.hasLogging}},data=await this.pack(payload);this.logger.debug("Sending project info to clients..."),client?client.emit("project.info",data):this.io.emit("project.info",data)},async sendBrokerOptions(client){const opts=safetyObject$1(this.broker.options);try{opts.$classNames={},this.broker.transit&&(opts.$classNames.transporter=this.broker.getConstructorName(this.broker.transit.tx)),this.broker.cacher&&(opts.$classNames.cacher=this.broker.getConstructorName(this.broker.cacher)),this.broker.serializer&&(opts.$classNames.serializer=this.broker.getConstructorName(this.broker.serializer)),this.broker.validator&&(opts.$classNames.validator=this.broker.getConstructorName(this.broker.validator))}catch(err){this.logger.debug("Unable to collect module classnames.",err)}const data=await this.pack(opts);client?client.emit("broker.options",data):this.io.emit("broker.options",data)},sendRegistryUpdatedMessage:_$3.throttle((async function(){this.logger.debug("Registry updated. Sending new registry to clients..."),this.io.emit("service-registry.updated",await this.pack(this.getRegistryContent()))}),2e3),getRegistryContent(){return this.broker.registry.nodes.toArray().map((node=>{const res=_$3.pick(node,["id","local","available","hostname","ipList","instanceID","offlineSince","lastHeartbeatTime"]);return res.rawInfo=this.broker.registry.getNodeInfo(res.id),res}))},hashURL:str=>Buffer.from(str).toString("base64")},created(){this.nodeIDList=new Set,this.hasMetrics=!1,this.hasTracing=!1,this.hasLogging=!1,this.settings.token||(this.logger.debug("Token is not set. Generating a random token..."),generatedToken=!0,this.settings.token=Math.random().toString(36).substr(2,9))},async started(){this.logger.debug("Creating metric store..."),this.metricStore=new MetricStore(this,this.settings.store),await this.metricStore.connect(),this.logger.debug("Creating tracing store..."),this.traceStore=new TraceStore(this,this.settings.store),await this.traceStore.connect(),this.logger.debug("Creating logging store..."),this.logStore=new LogStore(this,this.settings.store),await this.logStore.connect();const origins=["http://lab.moleculer.services","https://lab.moleculer.services","http://localhost:8080"];this.logger.debug("Creating IO server..."),this.io=IO(this.settings.port,{serveClient:!1,origins:origins,allowRequest:(req,cb)=>{let token;this.logger.debug("Received client handshake");const auth=req.headers.authorization;if(auth&&auth.startsWith("Token ")&&(token=auth.slice(6)),token&&this.settings.token&&token.length==this.settings.token.length&&crypto.timingSafeEqual(Buffer.from(token),Buffer.from(this.settings.token)))return cb(null,!0);this.logger.debug("Invalid client token. Decline the connection",token),cb(4,!1)},handlePreflightRequest:(req,res)=>{const origin=req.headers.origin,validOrigin=-1!==origins.indexOf(origin);this.logger.debug("Handle CORS preflight request.",{origin:origin}),res.writeHead(200,{"Access-Control-Allow-Headers":"Authorization","Access-Control-Allow-Methods":"GET","Access-Control-Allow-Origin":validOrigin?req.headers.origin:origins[0],"Access-Control-Allow-Credentials":!0}),res.end()}}),this.io.on("connection",(async client=>{this.logger.debug(`Moleculer Lab Client connected (${client.conn.remoteAddress}).`),client.on("disconnect",(()=>{this.logger.debug(`Moleculer Lab Client disconnected (${client.conn.remoteAddress}).`)})),await this.sendProjectInfo(client),await this.sendBrokerOptions(client),client.emit("service-registry.updated",await this.pack(this.getRegistryContent())),client.on("callAction",(async(actionName,params,opts,callback)=>{try{this.logger.debug(`Client calls the '${actionName}' action`,{actionName:actionName,params:params,opts:opts});const response=await this.broker.call(actionName,params,opts);callback&&callback(null,await this.pack(response))}catch(err){const res={...err};res.name=err.name,res.message=err.message,res.stack=err.stack,callback&&callback(res)}})),client.on("emitEvent",(async(eventName,params,opts,callback)=>{try{this.logger.debug(`Client emits the '${eventName}' event`,{eventName:eventName,params:params,opts:opts}),await this.broker.emit(eventName,params,opts),callback&&callback(null,!0)}catch(err){const res={...err};res.name=err.name,res.message=err.message,res.stack=err.stack,callback&&callback(res)}})),client.on("broadcastEvent",(async(eventName,params,opts,callback)=>{try{this.logger.debug(`Client broadcasts the '${eventName}' event`,{eventName:eventName,params:params,opts:opts}),await this.broker.broadcast(eventName,params,opts),callback&&callback(null,!0)}catch(err){const res={...err};res.name=err.name,res.message=err.message,res.stack=err.stack,callback&&callback(res)}})),client.on("metrics:getLastValue",(async(params,callback)=>{try{let res;this.logger.debug("Client requests data from metric store",{method:"getLastValue",params:params}),res=Array.isArray(params)?await Promise.all(params.map((p=>this.metricStore.getLastValue(p)))):await this.metricStore.getLastValue(params),callback&&callback(null,await this.pack(res))}catch(err){const res={...err};res.name=err.name,res.message=err.message,res.stack=err.stack,callback&&callback(res)}})),client.on("metrics:aggregate",(async(params,callback)=>{try{let res;this.logger.debug("Client requests data from metric store",{method:"aggregate",params:params}),res=Array.isArray(params)?await Promise.all(params.map((p=>this.metricStore.aggregate(p)))):await this.metricStore.aggregate(params),callback&&callback(null,await this.pack(res))}catch(err){const res={...err};res.name=err.name,res.message=err.message,res.stack=err.stack,callback&&callback(res)}})),client.on("metrics:aggregateHistory",(async(params,callback)=>{try{let res;this.logger.debug("Client requests data from metric store",{method:"aggregateHistory",params:params}),res=Array.isArray(params)?await Promise.all(params.map((p=>this.metricStore.aggregateHistory(p)))):await this.metricStore.aggregateHistory(params),callback&&callback(null,await this.pack(res))}catch(err){const res={...err};res.name=err.name,res.message=err.message,res.stack=err.stack,callback&&callback(res)}})),client.on("traces:getLastTraces",(async(params,callback)=>{try{this.logger.debug("Client requests data from tracing store",{method:"getLastTraces",params:params});const res={traces:await this.traceStore.getLastTraces(params),count:await this.traceStore.countAllTraces()};callback&&callback(null,await this.pack(res))}catch(err){const res={...err};res.name=err.name,res.message=err.message,res.stack=err.stack,callback&&callback(res)}})),client.on("traces:getTraceDetails",(async(params,callback)=>{try{this.logger.debug("Client requests data from tracing store",{method:"getTraceDetails",params:params});const res=await this.traceStore.getTraceDetails(params.id);callback&&callback(null,await this.pack(res))}catch(err){const res={...err};res.name=err.name,res.message=err.message,res.stack=err.stack,callback&&callback(res)}})),client.on("logs:getLastEntries",(async(params,callback)=>{try{this.logger.debug("Client requests data from log store",{method:"getLastEntries",params:params});const res=await this.logStore.getLastEntries(params);callback&&callback(null,await this.pack(res))}catch(err){const res={...err};res.name=err.name,res.message=err.message,res.stack=err.stack,callback&&callback(res)}}))}));const accessURL=`https://lab.moleculer.services/project/${this.hashURL("http://localhost:"+this.settings.port)}`;setTimeout((()=>{this.logger.info("***********************************************************"),this.logger.info(""),this.logger.info("  ✔ Moleculer Laboratory service started."),this.logger.info(""),this.logger.info(`  Token: ${kleur.grey("("+(generatedToken?"generated":"static")+")")}`),this.logger.info(`      ${kleur.bold().yellow(this.settings.token)}`),this.logger.info(""),this.logger.info("  Agent running on:"),this.logger.info(`      ${kleur.cyan("http://localhost:"+this.settings.port)}`),this.logger.info(""),this.logger.info("  Open Laboratory:"),this.logger.info(`      ${kleur.cyan(accessURL)}`),this.logger.info(""),this.logger.info("***********************************************************")}),1e3)},async stopped(){this.io&&this.io.close(),this.metricStore&&(await this.metricStore.close(),this.metricStore=null),this.traceStore&&(await this.traceStore.close(),this.traceStore=null),this.logStore&&(await this.logStore.close(),this.logStore=null)}};const Reporters=require$$0__default.default.MetricReporters,_$2=require$$0__default$1.default,zlib$2=require$$2__default$2.default,{promisify:promisify$2}=require$$3__default.default,deflate$2=promisify$2(zlib$2.deflate);class LabEventReporter extends Reporters.Base{constructor(opts){super(opts),this.opts=_$2.defaultsDeep(this.opts,{broadcast:!0,onlyChanges:!0,interval:10,compress:!0}),this.lastChanges=new Set}init(registry){super.init(registry),this.opts.interval>0&&(this.timer=setInterval((()=>this.sendEvent()),1e3*this.opts.interval),this.timer.unref())}async sendEvent(){let data,list=this.registry.list({includes:this.opts.includes,excludes:this.opts.excludes});if(this.opts.onlyChanges&&(list=list.filter((metric=>this.lastChanges.has(metric.name)))),this.lastChanges.clear(),0!=list.length){if(this.opts.compress){const json=JSON.stringify(list);data=Buffer.from(await deflate$2(json))}else data=list;this.opts.broadcast?(this.logger.debug(`Send metrics.snapshot (${list.length} metrics) broadcast event.`),this.broker.broadcast("$lab.metrics.changes",data,{groups:this.opts.groups})):(this.logger.debug(`Send metrics.snapshot (${list.length} metrics) event.`),this.broker.emit("$lab.metrics.changes",data,{groups:this.opts.groups}))}}metricChanged(metric){this.matchMetricName(metric.name)&&this.lastChanges.add(metric.name)}}Reporters.Laboratory=LabEventReporter;var metricReporter=LabEventReporter;const Exporters=require$$0__default.default.TracerExporters,{safetyObject:safetyObject}=require$$0__default.default.Utils,_$1=require$$0__default$1.default,zlib$1=require$$2__default$2.default,{promisify:promisify$1}=require$$3__default.default,deflate$1=promisify$1(zlib$1.deflate);class LabEventExporter extends Exporters.Base{constructor(opts){super(opts),this.opts=_$1.defaultsDeep(this.opts,{broadcast:!0,interval:10,compress:!0,defaultTags:null}),this.queue=[]}init(tracer){super.init(tracer),this.opts.interval>0&&(this.timer=setInterval((()=>this.flush()),1e3*this.opts.interval),this.timer.unref()),this.defaultTags=_$1.isFunction(this.opts.defaultTags)?this.opts.defaultTags.call(this,tracer):this.opts.defaultTags}stop(){return this.timer&&(clearInterval(this.timer),this.timer=null),this.Promise.resolve()}spanFinished(span){this.queue.push(span),this.timer||this.flush()}async flush(){if(0==this.queue.length)return;const list=this.generateTracingData();let data;if(this.queue.length=0,this.opts.compress){const json=JSON.stringify(list);data=Buffer.from(await deflate$1(json))}else data=list;this.opts.broadcast?(this.logger.debug(`Send tracing spans (${list.length} spans) broadcast event.`),this.broker.broadcast("$lab.tracing.spans",data,{groups:this.opts.groups})):(this.logger.debug(`Send tracing spans (${list.length} spans) event.`),this.broker.emit("$lab.tracing.spans",data,{groups:this.opts.groups}))}generateTracingData(){return Array.from(this.queue).map((span=>{const newSpan=safetyObject(span);return newSpan.error&&(newSpan.error=this.errorToObject(span.error)),newSpan}))}}Exporters.Laboratory=LabEventExporter;var traceExporter=LabEventExporter;const Loggers=require$$0__default.default.Loggers,_=require$$0__default$1.default,zlib=require$$2__default$2.default,{promisify:promisify,inspect:inspect}=require$$3__default.default,deflate=promisify(zlib.deflate);class LabEventLogger extends Loggers.Base{constructor(opts){super(opts),this.opts=_.defaultsDeep(this.opts,{broadcast:!0,interval:10,compress:!0,objectPrinterDepth:2}),this.queue=[],this.timer=null,this.objectPrinter=o=>inspect(o,{showHidden:!1,depth:this.opts.objectPrinterDepth,colors:!1,breakLength:Number.POSITIVE_INFINITY})}init(loggerFactory){super.init(loggerFactory),this.opts.interval>0&&(this.timer=setInterval((()=>this.sendLogEntries()),1e3*this.opts.interval),this.timer.unref())}stop(){return this.timer&&clearInterval(this.timer),this.Promise.resolve()}printArgs(args){return args.map((p=>_.isObject(p)||Array.isArray(p)?this.objectPrinter(p):p))}getLogHandler(bindings){const level=bindings?this.getLogLevel(bindings.mod):null;if(!level)return null;const levelIdx=Loggers.Formatted.LEVELS.indexOf(level);return(type,args)=>{if(Loggers.Formatted.LEVELS.indexOf(type)>levelIdx)return;const message=this.printArgs(args).join(" ").replace(/\u001b\[.*?m/g,"");this.queue.push({ts:Date.now(),...bindings,level:type,message:message})}}async sendLogEntries(){const list=Array.from(this.queue);if(this.queue.length=0,0==list.length)return;let data;if(this.opts.compress){const json=JSON.stringify(list);data=Buffer.from(await deflate(json))}else data=list;this.opts.broadcast?this.broker.broadcast("$lab.log.entries",data,{groups:this.opts.groups}):this.broker.emit("$lab.log.entries",data,{groups:this.opts.groups})}}Loggers.Laboratory=LabEventLogger;var src={AgentService:service,MetricReporter:metricReporter,TraceExporter:traceExporter,EventLogger:LabEventLogger};module.exports=src;

RangeError: Invalid string length
    at JSON.stringify (<anonymous>)
    at LabEventExporter.flush (/Users/akshay/Desktop/GovEVA/moleculer-goveva/node_modules/@moleculer/lab/dist/index.js:1:39978)
    at Timeout._onTimeout (/Users/akshay/Desktop/GovEVA/moleculer-goveva/node_modules/@moleculer/lab/dist/index.js:1:39498)
    at listOnTimeout (node:internal/timers:569:17)
    at process.processTimers (node:internal/timers:512:7)

Node.js v18.15.0
icebob commented 5 months ago

Based on the stack trace the issue comes from Laboratory Agent and not from the core modules. For load testing, don't use Laboratory, because it has an overhead.