bestlong / node-red-contrib-mssql-plus

A Node-RED node to read and write to Microsoft MS SQL Databases
MIT License
31 stars 18 forks source link

continuously getting ECONNCLOSED error #55

Closed maulikdoshi82 closed 2 years ago

maulikdoshi82 commented 2 years ago

Hi, I'm trying to parse a large CSV file (approx 10k records) and store it in SQL Server. As the SQL Server is remote or sometimes docker initialized, don't want to use bulk-insert.

Node-red is running in docker. data file is shared with it via volume mount.

Node-red can see the file (as I'm printing the entire payload as http response while debugging).

CSV is able to parse the whole file and share it per record basis.

My function before the MS-SQL invokes has below code:

exchCode = msg.payload["ExchangeCode"].trim()
exchange = msg.payload["Exchange"].trim()
indusry = msg.payload["Industry"].trim()
comp =  msg.payload["CompanyName"].trim()

pld =       "INSERT INTO [OCP_LOCAL].[ext].[Profile]"
pld = pld + "(CompanyName,ExchangeCode,Industry,Exchange,_seen) "
pld = pld + "VALUES ('" + comp + "','" + exchCode + "' ,'" + indusry + "','" + exchange + "',0)"

msg.payload = pld
return msg;

It is able to insert roughly 4 or 5 records in the database and for rest provides ECONNCLOSED. I believe the time taken to write one record is probably long hence provided below properties: ConnectionTimeout: 60000 requesttimeout: 0 canceltimeout: 0

Also added delay of 1 second upto 5 seconds after CSV parser but still no luck. And last idea tried was Max Pool Size to 100 though still no difference on the number of records being inserted in DB.

[{"id":"1bf07b4d6f4c42cf","type":"tab","label":"csvreader","disabled":false,"info":""},{"id":"40fe383c56a1a128","type":"http in","z":"1bf07b4d6f4c42cf","name":"http-in","url":"/invokecsv","method":"get","upload":false,"swaggerDoc":"","x":170,"y":160,"wires":[["b0b04a9607b7444b"]]},{"id":"b0b04a9607b7444b","type":"file in","z":"1bf07b4d6f4c42cf","name":"ProfileMaster","filename":"/data/Profile_202108121438.csv","format":"utf8","chunk":false,"sendError":false,"encoding":"utf8","allProps":false,"x":350,"y":160,"wires":[["8cf03da220982022","712817f5220aab38"]]},{"id":"830d889c02bf783e","type":"debug","z":"1bf07b4d6f4c42cf","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":1310,"y":180,"wires":[]},{"id":"8cf03da220982022","type":"http response","z":"1bf07b4d6f4c42cf","name":"","statusCode":"200","headers":{},"x":540,"y":240,"wires":[]},{"id":"9752a07b71f0349e","type":"switch","z":"1bf07b4d6f4c42cf","name":"","property":"payload[\"ExchangeCode\"]","propertyType":"msg","rules":[{"t":"nnull"}],"checkall":"true","repair":false,"outputs":1,"x":890,"y":60,"wires":[["129d3222e252ea8f"]]},{"id":"aa88f3f752cb3e99","type":"MSSQL","z":"1bf07b4d6f4c42cf","mssqlCN":"df8c0b88.91b0a8","name":"MSSQL","outField":"payload","returnType":"0","throwErrors":"0","query":"","modeOpt":"","modeOptType":"query","queryOpt":"payload","queryOptType":"msg","paramsOpt":"","paramsOptType":"none","rows":"","rowsType":"msg","params":[],"x":1140,"y":120,"wires":[["830d889c02bf783e","b1a99a1f10fd2aec"]]},{"id":"129d3222e252ea8f","type":"function","z":"1bf07b4d6f4c42cf","name":"update-sql-profile","func":"exchCode = msg.payload[\"ExchangeCode\"]\nexchange = msg.payload[\"Exchange\"]\nindusry = msg.payload[\"Industry\"]\ncomp = msg.payload[\"CompanyName\"]\n\npld = \"INSERT INTO [OCP_LOCAL].[ext].[Profile]\"\npld = pld + \"(CompanyName,ExchangeCode,Industry,Exchange) \"\npld = pld + \"VALUES ('\" + comp + \"','\" + exchCode + \"' ,'\" + indusry + \"','\" + exchange + \"')\"\n\n//pld = \"BULK INSERT [OCP_LOCAL].[ext].[Profile]\"\n//pld = pld + \" FROM '/data/ProfileMaster.csv'\"\n//pld = pld + \" WITH (\"\n//pld = pld + \"FIRSTROW=2,\"\n//pld = pld + \"LASTROW=5,\"\n//pld = pld + \"FIELDTERMINATOR=',',\"\n//pld = pld + \"TABLOCK\"\n//pld = pld + \")\"\nmsg.payload = pld\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":950,"y":200,"wires":[["aa88f3f752cb3e99"]]},{"id":"b1a99a1f10fd2aec","type":"debug","z":"1bf07b4d6f4c42cf","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"error","targetType":"msg","statusVal":"","statusType":"auto","x":1310,"y":260,"wires":[]},{"id":"461303971aa45e84","type":"delay","z":"1bf07b4d6f4c42cf","name":"","pauseType":"delay","timeout":"30","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"x":740,"y":60,"wires":[["9752a07b71f0349e"]]},{"id":"712817f5220aab38","type":"csv","z":"1bf07b4d6f4c42cf","name":"csv-parse","sep":",","hdrin":true,"hdrout":"all","multi":"one","ret":"\\r\\n","temp":"CompanyName,ExchangeCode,Industry,Exchange,_seen","skip":"0","strings":true,"include_empty_strings":true,"include_null_values":true,"x":580,"y":100,"wires":[["461303971aa45e84"]]},{"id":"df8c0b88.91b0a8","type":"MSSQL-CN","tdsVersion":"7_4","name":"aws","server":"${SQL_SERVER_URL}","port":"${SQL_SERVER_PORT}","encyption":false,"trustServerCertificate":true,"database":"OCP_LOCAL","useUTC":false,"connectTimeout":"60000","requestTimeout":"0","cancelTimeout":"0","pool":"100","parseJSON":false,"enableArithAbort":true,"credentials":{}}]

Steve-Mcl commented 2 years ago

The delay node delays every message by the delay amount. In other words, if you send 8000 messages through a delay of 5s, you will delay EVERY messages by exactly 5s. In other words, all 8000 will be sent one after another in quick concession following a single 5sec delay!. This is evident if you put a debug node after the delay.

Instead, you should be setting the delay node to "rate limit" mode, sending 1msg per 5sec (or greater) to spread them out.

Ps, why don't you want to use bulk insert? It is designed for this scenario.

Other alternative solutions...

image

Flow 1

is very similar to yours except is uses preared statements. This will guard against bad data in the SQL INSERT (protects against SQL Injections - see the settings I entered in the MSSQL-PLUS node)

Flow 2

gathers all SQL INSERTS into one huge string and executes them all at once.

Flow 3

is the best solution - it uses BULK insert,

Please try all 3 and see where you get to.

FLOWS...

[{"id":"40fe383c56a1a128","type":"http in","z":"1bf07b4d6f4c42cf","name":"","url":"/invokecsv","method":"get","upload":false,"swaggerDoc":"","x":160,"y":80,"wires":[["b0b04a9607b7444b"]]},{"id":"b0b04a9607b7444b","type":"file in","z":"1bf07b4d6f4c42cf","name":"ProfileMaster","filename":"/data/Profile_202108121438.csv","format":"utf8","chunk":false,"sendError":false,"encoding":"utf8","allProps":false,"x":350,"y":80,"wires":[["8cf03da220982022","712817f5220aab38"]]},{"id":"830d889c02bf783e","type":"debug","z":"1bf07b4d6f4c42cf","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":1250,"y":200,"wires":[]},{"id":"8cf03da220982022","type":"http response","z":"1bf07b4d6f4c42cf","name":"","statusCode":"","headers":{},"x":530,"y":80,"wires":[]},{"id":"9752a07b71f0349e","type":"switch","z":"1bf07b4d6f4c42cf","name":"ExchangeCode not null","property":"payload.ExchangeCode","propertyType":"msg","rules":[{"t":"nnull"}],"checkall":"true","repair":false,"outputs":1,"x":950,"y":140,"wires":[["129d3222e252ea8f"]]},{"id":"aa88f3f752cb3e99","type":"MSSQL","z":"1bf07b4d6f4c42cf","mssqlCN":"df8c0b88.91b0a8","name":"MSSQL","outField":"payload","returnType":"0","throwErrors":"1","query":"INSERT INTO [OCP_LOCAL].[ext].[Profile] (CompanyName,ExchangeCode,Industry,Exchange)\r\nVALUES (@CompanyName,@ExchangeCode,@Industry,@Exchange)","modeOpt":"","modeOptType":"query","queryOpt":"","queryOptType":"editor","paramsOpt":"","paramsOptType":"editor","rows":"","rowsType":"msg","params":[{"output":false,"name":"CompanyName","type":"VarChar","valueType":"msg","value":"payload.CompanyName","options":{"nullable":true,"primary":false,"identity":false,"readOnly":false}},{"output":false,"name":"ExchangeCode","type":"VarChar","valueType":"msg","value":"payload.ExchangeCode","options":{"nullable":true,"primary":false,"identity":false,"readOnly":false}},{"output":false,"name":"Industry","type":"VarChar","valueType":"msg","value":"payload.Industry","options":{"nullable":true,"primary":false,"identity":false,"readOnly":false}},{"output":false,"name":"Exchange","type":"VarChar","valueType":"msg","value":"payload.Exchange","options":{"nullable":true,"primary":false,"identity":false,"readOnly":false}}],"x":1080,"y":200,"wires":[["830d889c02bf783e"]]},{"id":"129d3222e252ea8f","type":"function","z":"1bf07b4d6f4c42cf","name":"trim strings","func":"msg.payload.ExchangeCode = msg.payload.ExchangeCode.trim();\nmsg.payload.Exchange = msg.payload.Exchange.trim();\nmsg.payload.Industry = msg.payload.Industry.trim();\nmsg.payload.CompanyName = msg.payload.CompanyName.trim();\n\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":810,"y":200,"wires":[["3d8e9dd9ace627ed","aa88f3f752cb3e99"]]},{"id":"b1a99a1f10fd2aec","type":"debug","z":"1bf07b4d6f4c42cf","name":"Errors","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":330,"y":740,"wires":[]},{"id":"461303971aa45e84","type":"delay","z":"1bf07b4d6f4c42cf","name":"","pauseType":"rate","timeout":"30","timeoutUnits":"seconds","rate":"1","nbRateUnits":"5","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"x":720,"y":140,"wires":[["9752a07b71f0349e"]]},{"id":"712817f5220aab38","type":"csv","z":"1bf07b4d6f4c42cf","name":"csv-parse","sep":",","hdrin":true,"hdrout":"none","multi":"one","ret":"\\r\\n","temp":"CompanyName,ExchangeCode,Industry,Exchange,_seen","skip":"0","strings":true,"include_empty_strings":true,"include_null_values":true,"x":540,"y":140,"wires":[["461303971aa45e84"]]},{"id":"665acc80c11b07f8","type":"catch","z":"1bf07b4d6f4c42cf","name":"","scope":null,"uncaught":false,"x":190,"y":740,"wires":[["b1a99a1f10fd2aec"]]},{"id":"dbad3136f7eb9ecf","type":"http in","z":"1bf07b4d6f4c42cf","name":"","url":"/invokecsvTEST","method":"get","upload":false,"swaggerDoc":"","x":140,"y":120,"wires":[["a37f32c9d6d22c41"]]},{"id":"a37f32c9d6d22c41","type":"template","z":"1bf07b4d6f4c42cf","name":"fake CSV","field":"payload","fieldType":"msg","format":"handlebars","syntax":"mustache","template":"CompanyName,ExchangeCode,Industry,Exchange,_seen\nmicrocode,mcod,technology,fexc,2021/08/11\ncodekingco,ckco,agrigulture,exez,2021/08/11\nhitechstuff,hits,technology,fexc,2021/08/11\n\nmoomoos,moos,agrigulture,exez,2021/08/11\nbarbartexh,bbxt,technology,fexc,2021/08/11\n\ncodafoo,cafo,technology,fexc,2021/08/11\nemailz,emlz,technology,fexc,2021/08/11\nbarnes,brnc,agrigulture,exez,2021/08/11","output":"str","x":340,"y":120,"wires":[["8cf03da220982022","712817f5220aab38"]]},{"id":"3d8e9dd9ace627ed","type":"debug","z":"1bf07b4d6f4c42cf","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":830,"y":240,"wires":[]},{"id":"1061567535ab517c","type":"inject","z":"1bf07b4d6f4c42cf","name":"test","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payloadType":"date","x":190,"y":160,"wires":[["a37f32c9d6d22c41"]]},{"id":"7b622551a18c19f3","type":"csv","z":"1bf07b4d6f4c42cf","name":"csv-parse","sep":",","hdrin":true,"hdrout":"none","multi":"mult","ret":"\\r\\n","temp":"CompanyName,ExchangeCode,Industry,Exchange,_seen","skip":"0","strings":true,"include_empty_strings":true,"include_null_values":true,"x":560,"y":380,"wires":[["4a13a3ce891c3746"]]},{"id":"f2a8a055a3076276","type":"template","z":"1bf07b4d6f4c42cf","name":"fake CSV","field":"payload","fieldType":"msg","format":"handlebars","syntax":"mustache","template":"CompanyName,ExchangeCode,Industry,Exchange,_seen\nmicrocode,mcod,technology,fexc,2021/08/11\ncodekingco,ckco,agrigulture,exez,2021/08/11\nhitechstuff,hits,technology,fexc,2021/08/11\n\nmoomoos,moos,agrigulture,exez,2021/08/11\nbarbartexh,bbxt,technology,fexc,2021/08/11\n\ncodafoo,cafo,technology,fexc,2021/08/11\nemailz,emlz,technology,fexc,2021/08/11\nbarnes,brnc,agrigulture,exez,2021/08/11","output":"str","x":360,"y":360,"wires":[["7b622551a18c19f3"]]},{"id":"956b7eaa6ab85a1c","type":"inject","z":"1bf07b4d6f4c42cf","name":"test","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payloadType":"date","x":190,"y":360,"wires":[["f2a8a055a3076276"]]},{"id":"4a13a3ce891c3746","type":"function","z":"1bf07b4d6f4c42cf","name":"generate inserts","func":"\nconst data = msg.payload;\n\n//trim values and generate a SQL INSERT for each row of data\nconst queries = data.map(e => {\n    if (e.ExchangeCode) {\n        const ExchangeCode = e.ExchangeCode.trim();\n        const Exchange = e.Exchange.trim();\n        const Industry = e.Industry.trim();\n        const CompanyName = e.CompanyName.trim();\n        return `INSERT INTO [OCP_LOCAL].[ext].[Profile] (CompanyName,ExchangeCode,Industry,Exchange)\n                VALUES ('${CompanyName}','${ExchangeCode}','${Industry}','${Exchange}');`    \n    } else {\n        return '';\n    }\n})\n\n//join all queries into a single query separated by a new line\nmsg.payload = queries.join('\\n')\n\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":820,"y":380,"wires":[["df5b384cc27da25f","789d223713d1b972"]]},{"id":"161aa818be798a7e","type":"comment","z":"1bf07b4d6f4c42cf","name":"FLOW2 - make one huge single set of INSERTs","info":"","x":300,"y":320,"wires":[]},{"id":"4153c0674ec5f5a1","type":"comment","z":"1bf07b4d6f4c42cf","name":"FLOW1 - Rate limit INSERTS to 1 every 5s, use Prepared Statement in MSSQL-PLUS node to avoid sql injection hacks","info":"","x":540,"y":40,"wires":[]},{"id":"665308aed38aff30","type":"csv","z":"1bf07b4d6f4c42cf","name":"csv-parse","sep":",","hdrin":true,"hdrout":"none","multi":"mult","ret":"\\r\\n","temp":"CompanyName,ExchangeCode,Industry,Exchange,_seen","skip":"0","strings":true,"include_empty_strings":true,"include_null_values":true,"x":560,"y":580,"wires":[["c529f1923d724394","5695b520029408fb"]]},{"id":"4fc1b19c57aba430","type":"template","z":"1bf07b4d6f4c42cf","name":"fake CSV","field":"payload","fieldType":"msg","format":"handlebars","syntax":"mustache","template":"CompanyName,ExchangeCode,Industry,Exchange,_seen\nmicrocode,mcod,technology,fexc,2021/08/11\ncodekingco,ckco,agrigulture,exez,2021/08/11\nhitechstuff,hits,technology,fexc,2021/08/11\n\nmoomoos,moos,agrigulture,exez,2021/08/11\nbarbartexh,bbxt,technology,fexc,2021/08/11\n\ncodafoo,cafo,technology,fexc,2021/08/11\nemailz,emlz,technology,fexc,2021/08/11\nbarnes,brnc,agrigulture,exez,2021/08/11","output":"str","x":360,"y":560,"wires":[["665308aed38aff30"]]},{"id":"162a14bb6c2fd4ae","type":"inject","z":"1bf07b4d6f4c42cf","name":"test","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payloadType":"date","x":190,"y":560,"wires":[["4fc1b19c57aba430"]]},{"id":"c529f1923d724394","type":"function","z":"1bf07b4d6f4c42cf","name":"generate BULK payload","func":"\nconst data = msg.payload;\n\n//trim values and return only items of interest\nconst bulkRows = data.map(e => {\n    if (e.ExchangeCode) {\n        return {\n            ExchangeCode: e.ExchangeCode.trim(),\n            Exchange: e.Exchange.trim(),\n            Industry: e.Industry.trim(),\n            CompanyName: e.CompanyName.trim()\n        } \n    } else {\n        return null;\n    }\n})\n\n//remove null rows\nmsg.bulkRows = bulkRows.filter(e => e != null);\n\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":870,"y":580,"wires":[["c626944891bdc1b9","7f30eca84cfe35f1"]]},{"id":"01894026a6763d4c","type":"comment","z":"1bf07b4d6f4c42cf","name":"FLOW3 - bulk insert all data in one hit","info":"","x":270,"y":520,"wires":[]},{"id":"7f30eca84cfe35f1","type":"MSSQL","z":"1bf07b4d6f4c42cf","mssqlCN":"df8c0b88.91b0a8","name":"MSSQL","outField":"payload","returnType":"0","throwErrors":"1","query":"[OCP_LOCAL].[ext].[Profile]\r\n","modeOpt":"","modeOptType":"bulk","queryOpt":"","queryOptType":"editor","paramsOpt":"","paramsOptType":"editor","rows":"bulkRows","rowsType":"msg","params":[{"output":false,"name":"CompanyName","type":"VarChar","valueType":"msg","value":"payload.CompanyName","options":{"nullable":true,"primary":false,"identity":false,"readOnly":false}},{"output":false,"name":"ExchangeCode","type":"VarChar","valueType":"msg","value":"payload.ExchangeCode","options":{"nullable":true,"primary":false,"identity":false,"readOnly":false}},{"output":false,"name":"Industry","type":"VarChar","valueType":"msg","value":"payload.Industry","options":{"nullable":true,"primary":false,"identity":false,"readOnly":false}},{"output":false,"name":"Exchange","type":"VarChar","valueType":"msg","value":"payload.Exchange","options":{"nullable":true,"primary":false,"identity":false,"readOnly":false}}],"x":1080,"y":580,"wires":[["3e8b35b34ff3cb08"]]},{"id":"df5b384cc27da25f","type":"debug","z":"1bf07b4d6f4c42cf","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":850,"y":420,"wires":[]},{"id":"c626944891bdc1b9","type":"debug","z":"1bf07b4d6f4c42cf","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"bulkRows","targetType":"msg","statusVal":"","statusType":"auto","x":900,"y":620,"wires":[]},{"id":"5695b520029408fb","type":"debug","z":"1bf07b4d6f4c42cf","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":650,"y":640,"wires":[]},{"id":"789d223713d1b972","type":"MSSQL","z":"1bf07b4d6f4c42cf","mssqlCN":"df8c0b88.91b0a8","name":"MSSQL","outField":"payload","returnType":"0","throwErrors":"1","query":"","modeOpt":"","modeOptType":"query","queryOpt":"payload","queryOptType":"msg","paramsOpt":"","paramsOptType":"none","rows":"","rowsType":"msg","params":[],"x":1080,"y":380,"wires":[["5208c5275882193f"]]},{"id":"5208c5275882193f","type":"debug","z":"1bf07b4d6f4c42cf","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":1250,"y":380,"wires":[]},{"id":"3e8b35b34ff3cb08","type":"debug","z":"1bf07b4d6f4c42cf","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":1250,"y":580,"wires":[]},{"id":"df8c0b88.91b0a8","type":"MSSQL-CN","tdsVersion":"7_4","name":"aws","server":"${SQL_SERVER_URL}","port":"${SQL_SERVER_PORT}","encyption":false,"trustServerCertificate":true,"database":"OCP_LOCAL","useUTC":false,"connectTimeout":"60000","requestTimeout":"0","cancelTimeout":"0","pool":"100","parseJSON":false,"enableArithAbort":true}]

NOTE:

I dont have your schema so could not test the 3 flows.

Steve-Mcl commented 2 years ago

@maulikdoshi82 did you test the examples and potential solutions I provided?

Steve-Mcl commented 2 years ago

Ok. I'm going to assume @maulikdoshi82 found a solution and I'm going to close this issue.

Please reopen if necessary.

maulikdoshi82 commented 2 years ago

Hi Steve,

Thank you for your response and real apologies for not reverting you sooner. As we had to go for demo, we used the static file load at the time of starting DB and progressed to other places.

Back to bulk insert now and this looks pretty good. I'm generating an array of around 98000+ records and pushing it in sql-server via batch mode. Array does get created quite quickly though while pushing the data in sql, it's throwing: Invalid column type from bcp client for colid 2.

Which seems a bummer as I'm creating table along with the bulk insert, which means schema is exactly the same. Kept all my fields as nullable too so it shouldn't be the issue. Would you have any recommendation to look for in fixing this issue?