Closed majinkai closed 6 years ago
Hi majinkai,
For the traces to follow through RabbitMQ
(or any remote components for that matter), there needs to be a propagate trace context over RPC. The general way of doing this is to inject trace context into the payload from the client, and have the receiver take these out and continue on with the trace.
With that said, we'll be taking a look at message queues in general and come up with a reference pattern on how to propagate trace context in message queues.
Thanks!
This is my custom RabbitMQ
plugin, I'm considering whether to need to trace consumer
Wow that's impressive!
dubbo
node looks good, and from the look of the RabbitMQ
node, it looks like the actual client tracing is done within BDP, kinda like the way jdbc tracing works. (Please let me know if I've got this wrong).
To continue on with the tracing beyond the RabbitMQ
node, there needs to be a way for the RabbitMQ application itself to be able to trace itself, which if it was a java application would be as simple as writing a plugin for it. But RabbitMQ
is not a java application, right?
Selected BDP, server map could not display BDP -> RabbitMQ -> MQ_CONSUMER
Selected MQ_CONSUMER, server map could show full server map.
Here's my fork https://github.com/majinkai/pinpoint/tree/rabbitmq
RabbitMQ Plugin
publishexchange
exchange
TRANSACTION_ID
, SPAN_ID
, PARENT_SPAN_ID
RabbitMQ Plugin
consumeTRANSACTION_ID
, SPAN_ID
, PARENT_SPAN_ID
exchange
exchange
and RABBITMQ_SERVICE_TYPE.getCode()
exchange
How can BDP's server map display fully?
In call tree, both sides can show full chain.
Amazing you've got it working this far :+1:
Message queues present a new kind of pattern when rendering the server map and I'm not 100% sure why PCS/BDP's server map doesn't display the full server map.
My suspicion (and I may be wrong here) is that it's because RabbitMQ's serviceType is defined TERMINAL
, and BFSLinkSelector#selectLink
doesn't traverse beyond a terminal node.
I don't think removing the TERMINAL
property from RabbitMQ's serviceType would be the right solution here as that might stop the RabbitMQ node from being rendered at all.
We might need to introduce a new way of handling these types of nodes.
We'll have a further look into this as well, but please let us know if you make any progress in this matter.
Thank you for your reply, if i make any progress, i will immediately feedback. :smile:
By the way, what is the meaning of each enum?
public enum ServiceTypeProperty {
TERMINAL,
RECORD_STATISTICS,
INCLUDE_DESTINATION_ID
}
@majinkai niubility~
If there are two consumers, only one will display in call tree, because child traceId was created by client side(BDP app) use under code:
TraceId nextId = trace.getTraceId().getNextTraceId();
recorder.recordNextSpanId(nextId.getSpanId());
Could we pass TRANSACTION_ID
, PARENT_ID(current trace's spanID)
to RabbitMQ, and consumers generate span by itself?
Regarding ServiceTypeProperty
:
ServiceTypeProperty
is used as a flag in various situations.
For example, TERMINAL
is used when the server map is created to emulate a node even though it doesn't have an agent attached to it.
RECORD_STATISTICS
tells the collector to maintain a caller/callee statistics and the web to fetch these statistics for the server map to render.
INCLUDE_DESTINATION_ID
is used to show a destination id when the call stack of a ServiceType
is rendered.
Regarding your question on a single subscribe having multiple consumers : You're absolutely right, we don't currently have a way to handle these kind of situation. It's a new pattern for us as well. The way the code works now is that the caller generates the SpanId for the receiver to use and the receiver uses this as it's SpanId. This was done to minimize collision in SpanId. Unfortunately, it has it's own problem as you've described above. The child spans will now use the same SpanId generated by the parent and things will blow up. I'd assume things will blow up in the opposite situation as well (multiple producer, single consumer). We'll definitely have to come up with a model that supports these situations as well.
Thanks for sharing!
I also write a plugin to monitor the jms message route on activemq,
but i have one question: in the main application map, the route seems to be right:
and in the trace page,the trace tree also seems to be right:
however, in the Server Map tab, the map is wrong:
the key codes is at here
ServiceType ACTIVEMQ_SERVICE_TYPE = ServiceTypeFactory.of(8400, "ACTIVEMQ", TERMINAL, RECORD_STATISTICS, INCLUDE_DESTINATION_ID);
producer codes: the destionation is the queue full name such as queue://test001
try {
String destination =MessageUtil.readString((Message)args[1],ActivemqConstants.META_SOURCE);
SpanEventRecorder recorder = trace.currentSpanEventRecorder();
recorder.recordApi(descriptor);
if (throwable == null) {
recorder.recordEndPoint(destination);
recorder.recordDestinationId(destination);
recorder.recordAttribute(ActivemqConstants.ACTIVEMQ_DESTINATION_ANNOTATION_KEY, destination);
} else {
recorder.recordException(throwable);
}
} finally {
trace.traceBlockEnd();
}
consumer codes:
@Override
protected void doInBeforeTrace(SpanRecorder recorder, Object target, Object[] args) {
Message message = (Message) args[1];
// You have to record a service type within Server range.
recorder.recordServiceType(ActivemqConstants.ACTIVEMQ_SERVICE_TYPE);
String source=MessageUtil.readString(message, ActivemqConstants.META_SOURCE);
// Record client address, server address.
recorder.recordEndPoint(source);
recorder.recordParentApplication(source, ActivemqConstants.ACTIVEMQ_SERVICE_TYPE.getCode());
recorder.recordAcceptorHost(source);
}
@Override
protected void doInAfterTrace(SpanRecorder recorder, Object target, Object[] args, Object result, Throwable throwable) {
AbstractMessageListenerContainer container=(AbstractMessageListenerContainer)target;
Message message=(Message)args[1];
String source=MessageUtil.readString(message, ActivemqConstants.META_SOURCE);
String remoteAddress=activemqResolver.findRemoteAddress(container.getConnectionFactory());
recorder.recordApi(methodDescriptor);
recorder.recordAttribute(ActivemqConstants.ACTIVEMQ_DESTINATION_ANNOTATION_KEY, source);
recorder.recordRemoteAddress(remoteAddress);
if (throwable != null) {
recorder.recordException(throwable);
}
}
so, why the server map is wrong? I expect the server map should be like the main map.
@aximo
I believe the issue is similar to that of the RabbitMQ
plugin. I think the reason why the Server Map tab (not the main server map) is drawing 2 separate nodes is because :
1) jms-consumer-app
's span has it's parentSpanId
set as jms-producer-app
, so it draws the edge from jms-producer-app
-> jms-consumer-app
. (FilteredMapServiceImpl#createMap
)
2) Then, when working with jms-consumer-app
's SpanEvent, there is an ActiveMQ
client call SpanEvent so it draws the ActiveMQ
node along with an edge pointing to it from jms-consumer-app
. (FilteredMapServiceImpl#addNodeFromSpanEvent
)
To solve this problem, we need to come up with a new way to solve the situation above. Something like have leave the step 2 alone, but in step 1, have a way for the jms-consumer-app
to link with the ActiveMQ
node and not with jms-consumer-app
node.
@majinkai @aximo Again, we'll try to come up with something in the next few weeks (hopefully), please let us know if you come up with a solution. Also, the single parent span, multiple child span problem should be looked as a separate issue. We'll try to look into this as well. Thanks again.
@Xylus @majinkai @aximo I'm also looking for those plugins (activemq and rabbitmq) :+1: . Any update about this?
@dawidmalina @majinkai @aximo Working on this right now. Been testing with @majinkai's RabbitMQ plugin.
The basic gist of it is to add a QUEUE ServiceTypeProperty
and handling these cases in Collector and in Web.
Code's frozen currently so I'll update more after 1.5.2 gets out (should be real soon now).
@Xylus Is there any progress about this? There is another situaction should be considered for supporting message queue. In basic, message is just data passed from here to there, then maybe the consumer will combine 2 continuing message as one and handle it together!!! I am developing to pinpoint kafka. I refactor the consumer code, such as a method handleAMessage(KeyedMessage) which only handle one message per call. Currently the kafka producer and consumer could be linked together, while I think there should be a virtual "kafka:topic" node between producer and comsumer in server map which describs the topology better. As you have said, this will change the collector behaviour when storing the span into hbase. Is this match your vision?
@jiaqifeng Awesome! Mostly done with the collector/web side with ActiveMQ java client as reference plugin. I'll push them to my fork if you'd like to take a look (@all). Like you said, I also ran into some problem in single producer, multiple consumer calls as our trace data model doesn't handle these - the call stack will only show one consumer span in this case. The situation you're describing seems to be similar but reversed (multiple producer, single consumer), and I'm not sure how it will manifest itself over on the Web (servermap/call stack). We're planning to overhaul our Trace table soon, so hopefully we can make modification to handle these cases then. I also completely agree with you that having a virtual node for topics/queues etc is best. I'll explain it with more detail after I push the code to my fork. Hang tight!
@majinkai @aximo @dawidmalina @jiaqifeng Pushed message queue support, and ActiveMQ java client code to here. This commit is the core support code. This commit is the ActiveMQ client plugin code, which I think would be more of your interest.
Quick overview
QUEUE
type to ServiceTypeProperty.QUEUE
, and RECORD_STATISTICS
as their ServiceTypeProperty.Rest of the code should be pretty straight forward. Please let me know if there's anything weird.
@Xylus I just have a fast look about the core support code. It is just what I EXPECTED. So what kind of server map could you see? Could you share a screen shot of the server map?
@jiaqifeng The screen shot looks like this ActiveMQ-Sender1 sent 2 messages:
One thing to note is that although there is only a single broker instance, nodes in the server map are generated for each queue/topic. If there are multiple broker instances serving the same queue/topic, they are merged into one like below: and the broker addresses can be seen by pressing the Servers button:
This was a design decision we've made, but do share if you think there's a better way~
@Xylus What a great job! I am not familiar with activeMQ model and I am focusing on kafka right now. In kafka, the producer send message to a broker list, and consumer only knows that a zookeeper will help it getting the message. So the exact broker host/ip may not available. Then only the topic is meaningful. Currently I think this model is enough. Another question is what will happen when a message was received after a long time send?
@jiaqifeng Not having the exact broker host/ip available shouldn't be a problem (I hope). For messages that are received after a long time also shouldn't be a problem. To give a bit more detail, let's say the producer sends a message to a queue at 10:00, and the consumer consumes it at 11:00. How the main server map is rendered will depend on the query time:
09:30~10:30 producer -> queue
10:30~11:30 queue -> consumer
09:30~11:30 producer -> queue -> consumer
The call stack however, will show everything that's happened up until the current time.
@Xylus Yes, I agree with you, host/ip and time is just a little bit far from perfect.
Any update about RocketMQ?
@all queue support is now in the master branch if you'd like to play around with it. #1741
@FishGel Not sure if anyone's working on RocketMQ. Let me know if anyone's working on it please.
@majinkai I've used your code as base for updating the RocketMQ plugin you were working on. Please feel free to take a look and see if some changes could make it into your plugin code. Thanks!
@Xylus @majinkai what is the status of this plugin?
@dawidmalina Not entirely sure, will have to check with @majinkai
@majinkai Feel free to update on us if you can (or cannot) work on the plugin. Thanks!
@Xylus , I tested your rabbitmq, and I found consumer can't work well, the instance isn't weaved by javassist, Code:
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer);
@citywander Yup, the rabbitmq plugin isn't complete. It's still a WIP and the code in my fork is only for reference for @majinkai for his rabbitmq plugin development.
My memory is a bit hazy but I think the handleDelivery
method above is not being traced because the DefaultConsumer
in your code isn't actually a com.rabbitmq.client.DefaultConsumer
, but is an inner class of the enclosing class that extends com.rabbitmq.client.DefaultConsumer
.
Adding a no-op super.handleDelivery(consumerTag, envelope, AMQP.BasicProperties properties, byte[] body);
would solve the problem for now.
(This is one of the things that will need to be sorted out before the plugin is ready for merging)
Sorry, because of the work, a long time has not been updated RabbitMQ plugin.
@Xylus , thanks for your help. I found when appA send message to rabbitmq, and appB received the message. In serverMap, appA->rabbitmq, appA->appB. I think appA->rabbitmq->appB is preferable.
@majinkai any progress regarding this change?
@dawidmalina sorry, no updates for plugin
@majinkai or @jiaqifeng can you guys tell us what is the status? We would love to see rabbitmq in pinpoint as oficial plugin.
@dawidmalina No status update from me since I just concern about common support of message queue . It seems this plugin is ready from my point of view. Or do you find issue for it?
@jiaqifeng No issue just want to have this plugin in oficial pinpoint. Can you create pyll request?
@Xylus I am implement a kafka-spring plugin, while I got a topology unexpected as below image. The producer send message to 2 topic, I want to see 2 kafka node with each named as topic name. While it doesnot show a group node or 2 node for each topic. I check the json data returned from /getServerMapData.pinpoint it works as I expected. So I want to know the topology logic for what conditions to seperate the message queue node as a group or different node? Some backgroud info, in kafka it is hard to know the kafka borker's address since it is highly dynamic, so I ignore some field in the span data structure, you can look at the below data for more detail. If some field such as physical address or other field is the key field please correct me!!!
I attached the getServerMapData below, histogram removed. {"key":"learn^KAFKA","applicationName":"learn","category":"KAFKA","serviceType":"KAFKA","serviceTypeCode":"8350","isWas":false,"isQueue":true,"isAuthorized":true,"serverList":{"broker.addr.unknown":{"name":"broker.addr.unknown","status":null,"instanceList":{"broker.addr.unknown":{"hasInspector":false,"name":"broker.addr.unknown","serviceType":"KAFKA","status":{"code":-1,"desc":"Unknown"}}}}},"instanceCount":1}, {"key":"SPRING_KAFKA_PRODUCER_STAND_ALONE^USER","applicationName":"USER","category":"USER","serviceType":"USER","serviceTypeCode":"2","isWas":false,"isQueue":false,"isAuthorized":true,"histogram":"serverList":{},"instanceCount":0}, {"key":"SPRING_KAFKA_PRODUCER^STAND_ALONE","applicationName":"SPRING_KAFKA_PRODUCER","category":"STAND_ALONE","serviceType":"STAND_ALONE","serviceTypeCode":"1000","isWas":true,"isQueue":false,"isAuthorized":true,"serverList":{"jack-Inspiron-7460":{"name":"jack-Inspiron-7460","status":null,"instanceList":{"agent1":{"hasInspector":true,"name":"agent1","serviceType":"STAND_ALONE","status":{"code":-1,"desc":"Unknown"}}}}},"instanceCount":1}, {"key":"someday^KAFKA","applicationName":"someday","category":"KAFKA","serviceType":"KAFKA","serviceTypeCode":"8350","isWas":false,"isQueue":true,"isAuthorized":true,"serverList":{"broker.addr.unknown":{"name":"broker.addr.unknown","status":null,"instanceList":{"broker.addr.unknown":{"hasInspector":false,"name":"broker.addr.unknown","serviceType":"KAFKA","status":{"code":-1,"desc":"Unknown"}}}}},"instanceCount":1}
I have compared the producer span according to activemq, and I exchange the destinationId and endPoint of the span, but it still grouped together as one Kafka group. You can see "Unknown group from SPRING_KAFAK_PRODUCER". How does it consider 2 server to be in one group? I just want each topic be a single node, do not group together.
@Xylus, I change the span field value and got things worked a half as I wanted. By default, kafka broker was merged into one node icon and the link is broken in topology below. While I searched the code I found I can right click the server map and select NOT group kafka node and the topology is what I want! I read the js code in server-map.directive.js which will add KAFKA into mergeStatus list when it is notQueue, but I really set the KAFKA as queue type. While I does not familiar with javascript and just guess from literal. Finally, I just want a way to do not group the Kafka brokers icon. What could I do?
This is the default topology I dont want. This is what I want to be default behaviour.
@Xylus Have you study the activemq client code? It seems from your plugin that when ActiveMQMessageProducer.send() is called the broker address is chosen. While in kafka the broker address is chosen just before a batch of message is send in another sending thread. So it's really hard to get the real broker address for each message.
@jiaqifeng Sorry about the late reply. I'll take a look at what you've posted and get back to you soon. Thanks for working on Kafka plugin!
@jiaqifeng The merging of the queue node has been fixed via #2760. Queue nodes were excluded from merge targets but I think something was missing from code changes made in 1.6.1-SNAPSHOT. It should now be fixed.
As for the broker addresses, I don't know how kafka works internally so you'll probably have way better ideas than I do, so I'll just throw out some ideas :) If kafka is batching up messages in a separate thread to send them, you could move the point at which spanEvent recorder records the rpc event down to here. Now since this will be tracing an asynchronous invocation, you'll have to create the async trace id from the calling thread and inject this into an object (maybe the message itself?) that will be passed to the sender thread. Then, when the sender thread processes this message, you could read this async trace id and start tracing the event again.
@Xylus Thanks for your advice. Maybe my code base is a little older and I will check it. Main purpose of tracing kafka is to extend the trace linkage in our applications, so when and on which thread is the message send are not very important for us. Considered the code complexity and performace impace this will bring I give it up ^-^.
@Xylus Kafka group issue was fixed. When I run the integration test for kafka, I got another problem. The kafka artifact needs jdk7 to run while the inegration test use jdk6 instead. I read the agent pom.xml where has 2 executions one for jvm6, and one for jvm7. I searched a lot and could not found any info how to specify the jvm6 or jvm7 to run the integration test. Could you give me any advice?
@jiaqifeng There's a jdk7 package (com.navercorp.pinpoint.plugin.jdk7). If you make a directory in here for your kafka ITs, it should run on java 7. Please let me know if it doesn't work. Thanks :+1:
@Xylus Got it works. Thanks.
@Xylus I am bother you again, I wish this was the last time for kafka plugin ^-^. I am writting the integration test for producer and consumer. While producer is straight forward for it running in the main thread of the test. But for consumer who is running at another thread initialized by spring-kafka, the verifier I got is same as producer side. Also the assert failure in consumer thread does not affect the test result. How could I get the right verifier in consumer side and verify the trace as test result?
@jiaqifeng That looks pretty complicated. Are you verifying consumer traces in the other thread (the thread that runs the consumer)? If this is the case, assert failures might be ignored if they're not propagated to the main thread.
One thing you could try imo is to have the verifier verify everything in the calling (main) thread after the consumer finishes consuming the message (maybe use a countdown latch?). Note you might still have to add a bit of a wait time between message consumption and verification as consumer traces have to finish recording (which in itself is an async operation.
@Xylus I want to verify the consumer trace in consumer thread, but the trace I got in consumer thread is as same as producer. I used latch to wait comsumer at producer's thread and verify the producer trace as expected. But do not know howto verify the consumer trace. One solution I got is setup another case which run producer in a new thread and consume the message in the main thread. If no better way I will try this later.
@jiaqifeng Ah, gotcha. You mean both the producer thread and the consumer thread yielded only the traces that ran on the main thread (both producer thread and the consumer thread has only producer traces)? And yeh, if nothing else works, the workaround you've come up with sounds like a good idea!
merged via #3538
I want to develop the
RabbitMQ
plug-in, client sends the message, the method returns immediately, and the current trace will be committed. When consumers receive message, can it continue the transaction?