intelsdi-x / snap

The open telemetry framework
http://snap-telemetry.io
Apache License 2.0
1.79k stars 297 forks source link

Snap framework Resource exhausted issue. #1766

Open bhupeshpandey opened 5 years ago

bhupeshpandey commented 5 years ago

Hi,

  My scenario is as described below:

  1. Will have once exchange direct type having multiple routing keys let's say "collectorexchange".
  2. Will be creating a streaming collector to fetch data from collector exchange.
  3. Will have multiple processors to process this message based on different routing keys.
  4. Last one publisher to publish to an exchange let's say "PublisherExchange".

  Workflow:

  1. Have multiple task files defining the workflow for each data based on routing key. Currently 6.
  2. Have one task file collecting from the Publisher Exchange and assemble the data to publish to 
      final data exchange.

 Issue:

  When I am running two task files i.e 2 task files, I am working with single data everything seems to be 
  working fine. but when I add another task file i.e add another data to work with I start getting 
  resource exhausted issue. But one thing to note here is in this case the first task file for first data is 
  still working. What could be wrong with my workflow. How can I increase the grpc size.

  Let me explaing the below task files.

  Below I will be using the last two taswk files to collect data from the exchange called as Data.
  Then I will be processing the data in the task file and after processing will be publishing it to the rmq 
  exchange called OpSignalData. Then in the first task file I will be collecting the data back from the 
  OpSignalData and assemble them and then publish it to the exchange called OptimizerData.

Taskfile 1.

{ "version": 1, "name": "Collect and publish optimized messages", "schedule": { "type": "streaming" }, "max-failures": -1, "workflow": { "collect": { "metrics": { "/signal/data/data2": {}, "/signal/data/data1": {} }, "config": { "/signal/data/data1": { "routingkey": "data1", "rabbitMqHost": "", "exchangename": "OpSignalData", "exchangetype": "direct", "durable": true, "MaxCollectDuration": "1s", "MaxMetricsBuffer": 2 }, "/signal/data/data2": { "routingkey": "data2", "rabbitMqHost": "", "exchangename": "OpSignalData", "exchangetype": "direct", "durable": true, "MaxCollectDuration": "1s", "MaxMetricsBuffer": 2 } }, "process": [{ "plugin_name": "data-assembler", "publish": [{ "plugin_name": "rabbitmq-publisher", "config": { "url": "127.0.0.1", "vhost": "vhost", "port": "5672", "exchangename": "OptimizerData", "exchangetype": "fanout", "routingkey": "", "durable": true, "user": "user", "password": "password", "MaxCollectDuration": "1s", "rabbitMqHost": "" } }] }] } } }

Task file 2

=============================================== { "version": 1, "name": "Collect and optimize data1 Messages", "schedule": { "type": "streaming" }, "workflow": { "collect": { "metrics": { "/signal/data/data1": {} }, "config": { "/signal/data/data1": { "routingkey": "data1", "rabbitMqHost": "", "exchangename": "DATA", "exchangetype": "direct", "durable": true, "MaxMetricsBuffer": 1000, "MaxCollectDuration": "1s" } }, "process": [{ "plugin_name": "minmax", "config": { "min": 0.1, "max": 1000000.1, "invalidvaluemode": "USEBOUNDS" }, "process": [{ "plugin_name": "data1-average", "publish": [{ "plugin_name": "rabbitmq-publisher", "config": { "url": "127.0.0.1", "vhost": "vhost", "port": "5672", "exchangename": "OpSignalData", "exchangetype": "direct", "routingkey": "data1", "durable": true, "user": "user", "password": "pass", "MaxCollectDuration": "1s", "rabbitMqHost": "" } }] }] }] } } }

Task file 3 =============================================
{ "version": 1, "name": "Collect and optimize data2 Messages", "schedule": { "type": "streaming" }, "workflow": { "collect": { "metrics": { "/signal/data/data2": {} }, "config": { "/signal/data/data2": { "routingkey": "data2", "rabbitMqHost": "", "exchangename": "DATA", "exchangetype": "direct", "durable": true, "MaxCollectDuration": "1s", "MaxMetricsBuffer": 1000 } }, "process": [{ "plugin_name": "minmax", "config": { "min": 0.1, "max": 120.1, "invalidvaluemode": "USEBOUNDS" }, "process": [{ "plugin_name": "rpm-average", "publish": [{ "plugin_name": "rabbitmq-publisher", "config": { "url": "127.0.0.1", "vhost": "vhost", "port": "5672", "exchangename": "OpSignalData", "exchangetype": "direct", "routingkey": "rpm", "durable": true, "user": "user", "password": "pass", "MaxCollectDuration": "1s", "rabbitMqHost": "" } }] }] }] } } }