Open pgreenland opened 6 years ago
Hi Phil,
I was working on similar solution. After toiling hard for few weeks I was finally able to come up with a solution. My scenario was that I had one exchange of direct type in RabbitMQ and multiple routing keys. I had to basically create a queue to consume message for each routing key. Since I have multiple queues that means I have multiple consuming of data. That caused a problem for me as there would be blocking loop which causes only single data to be consumed always. I did get around that problem in my code and now have the data consumed at the speed of 4000 messages per second without being blocked. I hope if we can connect I would be able to help.
Hey @bhupeshpandey,
Thanks for the reply. At the time I started encountering this problem I hadn't began work on the AMQP consumer yet.
To me it seems to be an issue with the snap framework itself, which can be reproduced with the files attached to my first post. These implement simple counting metrics (no AMPQ connection required), writing their results to a file.
Regards,
Phil
Hi All,
I'm working on writing a new streaming collector plugin to collect messages from AMQP brokers and process them into snap metrics.
In order to understand how the StreamCollector interface works, I started with a simple counting plugin. This was based off the rand example. Here's the demo source I've been working on streaming collector bug.tar.gz....should build with make.
Running my two example tasks amqp-streaming-to-file-1.yaml and amqp-streaming-to-file-2.yaml (see examples directory) what I expect to see is a stream of samples for metrics test1 and test3 appearing in metrics_1.log. Along with a stream of samples for metrics test2 and test3 appearing in metrics_2.log. I expect test1 and test2 to count up sequentially in each file, while the values for test3 are split between the files as the counter is increased.
/tmp/metrics_1.log 2018-07-04 09:47:22.556629197 +0100 BST|/amqp_client/test1|0|tags[plugin_running_on:snaprabbit1] 2018-07-04 09:47:22.556629858 +0100 BST|/amqp_client/test3|0|tags[plugin_running_on:snaprabbit1] 2018-07-04 09:47:27.558378196 +0100 BST|/amqp_client/test1|1|tags[plugin_running_on:snaprabbit1] 2018-07-04 09:47:27.558389423 +0100 BST|/amqp_client/test3|2|tags[plugin_running_on:snaprabbit1] 2018-07-04 09:47:34.671643614 +0100 BST|/amqp_client/test2|2|tags[plugin_running_on:snaprabbit1] 2018-07-04 09:47:34.671654198 +0100 BST|/amqp_client/test3|5|tags[plugin_running_on:snaprabbit1] 2018-07-04 09:47:44.673479505 +0100 BST|/amqp_client/test2|4|tags[plugin_running_on:snaprabbit1] 2018-07-04 09:47:44.673490496 +0100 BST|/amqp_client/test3|9|tags[plugin_running_on:snaprabbit1] 2018-07-04 09:47:54.676821079 +0100 BST|/amqp_client/test2|6|tags[plugin_running_on:snaprabbit1] 2018-07-04 09:47:54.67685517 +0100 BST|/amqp_client/test3|13|tags[plugin_running_on:snaprabbit1]
/tmp/metrics_2.log 2018-07-04 09:47:24.668223295 +0100 BST|/amqp_client/test2|0|tags[plugin_running_on:snaprabbit1] 2018-07-04 09:47:24.668223846 +0100 BST|/amqp_client/test3|1|tags[plugin_running_on:snaprabbit1] 2018-07-04 09:47:29.670208801 +0100 BST|/amqp_client/test2|1|tags[plugin_running_on:snaprabbit1] 2018-07-04 09:47:29.670218793 +0100 BST|/amqp_client/test3|3|tags[plugin_running_on:snaprabbit1] 2018-07-04 09:47:39.672809295 +0100 BST|/amqp_client/test2|3|tags[plugin_running_on:snaprabbit1] 2018-07-04 09:47:39.672815495 +0100 BST|/amqp_client/test3|7|tags[plugin_running_on:snaprabbit1] 2018-07-04 09:47:49.675250013 +0100 BST|/amqp_client/test2|5|tags[plugin_running_on:snaprabbit1] 2018-07-04 09:47:49.675260768 +0100 BST|/amqp_client/test3|11|tags[plugin_running_on:snaprabbit1] 2018-07-04 09:47:59.677579672 +0100 BST|/amqp_client/test2|7|tags[plugin_running_on:snaprabbit1] 2018-07-04 09:47:59.677585336 +0100 BST|/amqp_client/test3|15|tags[plugin_running_on:snaprabbit1]
It looks good for the first few samples, before test2 appears in the first file and test1 disappears. Additionally samples are missing, test3 has no value 4 which should have appeared in the first file in this case.
It seems that this sequence of results (the point at which the first file changes.....or second file if started the other way around) is reproducible every time.
I'm running the latest release (snapteld version 2.0.0) on Debian 9, within a VirtualBox VM.
I've checked everything I can think of in my plugin to ensure the correct metrics are being generated (see the debug statements visible in standalone mode). I've even gone as far as capturing the grpc communication between the plugin and snap using wireshark, checking that the metric names are appearing on the correct grpc streams, everything looks fine.
I've been nosing around the snap source code, instrumenting and prodding for a day or two but to no avail.
Adding a log statement to the function "stream" in the file "scheduler/task.go" appears that the metrics are corrupted as they're retrieved from the channel "metricsChan".
Additionally adding a log statement to the function "handleInStream" in "control/plugin/client/grpc.go" - the other side of the metrics channel also shows the corruption.
It appears that it may be related to the grpc library itself. I noticed the glide.yaml file referencing an old version of the library. I managed to force it up the latest version, re-generated the protobuf definitions and built but see the same result.
Have given up my quest for a solution for now, my workaround at the moment is only using the plugin from a single task.
Any pointers would be appreciated, I'm happy to assist in producing a fix.
Thanks,
Phil