Closed hugree closed 2 years ago
Hi! This dependency is no longer needed com.github.gavlyukovskiy:p6spy-spring-boot-starter
. Here you have an example of a mix of brave & otel apps https://github.com/spring-cloud-samples/sleuth-documentation-apps/ . Also here you have samples with brave & otel https://github.com/spring-cloud-samples/spring-cloud-sleuth-samples/ . Also please ensure that either of these exporters is actually registered https://github.com/spring-projects-experimental/spring-cloud-sleuth-otel/blob/main/spring-cloud-sleuth-otel-autoconfigure/src/main/java/org/springframework/cloud/sleuth/autoconfig/otel/OtelExporterConfiguration.java#L53
Without com.github.gavlyukovskiy:p6spy-spring-boot-starter
I don't see the JDBC tracing. Why do you say it's not needed?
As for my problem I've found the weirdest thing. I'm using regular Spring Boot's regular JDBC capabilities but for services that send JMS messages ... I did have to create a custom configuration, which seems to be the problem btw, in order to handle multiple queues. It looks like this:
@Slf4j
@Configuration
@EnableJms
public class JmsConfiguration {
private static final String QUEUE_CONFIGURATIONS_PREFIX = "queue.configurations";
private static final Integer MAX_CLIENT_SIDE_REDELIVERY_COUNT = 20; // bigger than any azure service bus redelivery count
@Bean
public ListenerManager listenerManager() {
return new ListenerManager();
}
@Bean
public JavaTimeModule javaTimeModule() {
return new JavaTimeModule();
}
@Bean
public ObjectMapper objectMapper() {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(javaTimeModule());
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
return objectMapper;
}
@Bean
public MappingJackson2MessageConverter messageConverter(ObjectMapper objectMapper) {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
converter.setObjectMapper(objectMapper);
return converter;
}
@Bean
public JmsDefaultPrefetchPolicy prefetchPolicy() {
JmsDefaultPrefetchPolicy policy = new JmsDefaultPrefetchPolicy();
policy.setAll(1);
return policy;
}
@Bean
public BeanFactoryPostProcessor beanDefinitionRegistryPostProcessor(
Optional<ErrorHandler> errorHandler, Environment environment,
MessageConverter messageConverter) {
Bindable<Map<String, QueueConfiguration>> mapBindable = Bindable.mapOf(String.class, QueueConfiguration.class);
Map<String, QueueConfiguration> queueConfigurationMap = Binder.get(environment).bind(QUEUE_CONFIGURATIONS_PREFIX, mapBindable).get();
int outcome = environment.getProperty("queue.max-redelivery-outcome", Integer.class, REJECTED);
return beanFactory -> queueConfigurationMap.forEach((queue, config) -> {
ConnectionFactory connectionFactory = jmsConnectionFactory(prefetchPolicy(), config, outcome, null, null);
JmsTemplate jmsTemplate = jmsTemplate(connectionFactory, messageConverter);
beanFactory.registerSingleton(config.getName() + "ConnectionFactory", connectionFactory);
beanFactory.registerSingleton(config.getContainerFactoryName(), queueContainerFactory(connectionFactory, errorHandler, config, messageConverter));
beanFactory.registerSingleton(config.getJmsTemplateName(), jmsTemplate);
if (config.getMessageHandlerName() != null) {
registerMessageHandler(outcome, beanFactory, config, jmsTemplate);
}
});
}
private void registerMessageHandler(int outcome, ConfigurableListableBeanFactory beanFactory, QueueConfiguration config, JmsTemplate jmsTemplate) {
RedeliverySender redeliverySender = config.getRedeliveryDelays() != null ? redeliverySender(jmsTemplate, config) : null;
beanFactory.registerSingleton(config.getMessageHandlerName(), messageHandler(outcome, redeliverySender));
}
private JmsTemplate jmsTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
jmsTemplate.setMessageConverter(messageConverter);
return jmsTemplate;
}
private ConnectionFactory jmsConnectionFactory(
JmsPrefetchPolicy prefetchPolicy,
QueueConfiguration properties,
int maxRedeliveriesOutcome,
Tracer tracer,
OpenTelemetry openTelemetry) {
JmsConnectionFactory connectionFactory = new JmsConnectionFactory(properties.getUrl());
connectionFactory.setClientID(properties.getClientId());
connectionFactory.setUsername(properties.getUsername());
connectionFactory.setPassword(properties.getPassword());
connectionFactory.setReceiveLocalOnly(true);
JmsDefaultRedeliveryPolicy jmsRedeliveryPolicy = new JmsDefaultRedeliveryPolicy();
jmsRedeliveryPolicy.setMaxRedeliveries(properties.getRedeliveryDelays() != null ? 0 : MAX_CLIENT_SIDE_REDELIVERY_COUNT);
jmsRedeliveryPolicy.setOutcome(maxRedeliveriesOutcome);
connectionFactory.setRedeliveryPolicy(jmsRedeliveryPolicy);
connectionFactory.setPrefetchPolicy(prefetchPolicy);
SingleConnectionFactory singleConnectionFactory = new SingleConnectionFactory(connectionFactory);
singleConnectionFactory.setReconnectOnException(true);
// JmsTracer jmsTracer = new OpenTelemetryTracer(tracer, openTelemetry);
// connectionFactory.setTracer(jmsTracer);
return singleConnectionFactory;
}
private JmsListenerContainerFactory<DefaultMessageListenerContainer> queueContainerFactory(
ConnectionFactory connectionFactory,
Optional<ErrorHandler> errorHandler,
QueueConfiguration queueConfiguration,
MessageConverter messageConverter) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setSubscriptionDurable(false); // won't work with concurrent listeners
factory.setPubSubDomain(false); // false for queue
factory.setMessageConverter(messageConverter);
factory.setDestinationResolver(new DynamicDestinationResolver());
factory.setConcurrency(queueConfiguration.getConcurrency());
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
factory.setSessionTransacted(false);
errorHandler.ifPresent(factory::setErrorHandler);
return factory;
}
private RedeliverySender redeliverySender(JmsTemplate jmsTemplate, QueueConfiguration config) {
return RedeliverySender.builder()
.jmsTemplate(jmsTemplate)
.queueName(config.getName())
.redeliveryDelays(config.getRedeliveryDelays())
.build();
}
private MessageHandler messageHandler(int maxRedeliveryOutcome, MessageHandler.RetryConsumer retryConsumer) {
return new MessageHandler(maxRedeliveryOutcome, objectMapper(), retryConsumer);
}
}
I'm also trying to create my own OpenTelemetry JmsTracer for Apache Qpid and it actually works (the spans are being propagated). Qpid only support OpenTracing, but I would like to have only one standard since Spring is ready for OpenTelemetry now. It's still a work in progress but it looks like this:
@Slf4j
public class OpenTelemetryTracer implements JmsTracer {
static final String REDELIVERIES_EXCEEDED = "redeliveries-exceeded";
static final String MESSAGE_EXPIRED = "message-expired";
static final String SEND_SPAN_NAME = "amqp-delivery-send";
static final String RECEIVE_SPAN_NAME = "receive";
static final String ONMESSAGE_SPAN_NAME = "onMessage";
static final String DELIVERY_SETTLED = "delivery settled";
static final String STATE = "state";
static final String EVENT = "event";
static final String COMPONENT = "qpid-jms";
static final Object ERROR_EVENT = "error";
static final String SEND_SPAN_CONTEXT_KEY = "sendSpan";
static final String ARRIVING_SPAN_CTX_CONTEXT_KEY = "arrivingContext";
static final String DELIVERY_SPAN_CONTEXT_KEY = "deliverySpan";
static final String ONMESSAGE_SCOPE_CONTEXT_KEY = "onMessageScope";
static final String ANNOTATION_KEY = "x-opt-qpid-tracestate";
private Tracer tracer;
private OpenTelemetry openTelemetry;
OpenTelemetryTracer(Tracer tracer, OpenTelemetry openTelemetry) {
this.tracer = tracer;
this.openTelemetry = openTelemetry;
}
@Override
public void initSend(TraceableMessage message, String address) {
Span span = tracer.spanBuilder(SEND_SPAN_NAME).setSpanKind(SpanKind.CLIENT)
.setAttribute(Tags.SPAN_KIND, Tags.SPAN_KIND_PRODUCER)
.setAttribute(Tags.MESSAGE_BUS_DESTINATION, address)
.setAttribute(Tags.COMPONENT, COMPONENT)
.startSpan();
String tidFromMdc = MDC.get("traceId");
String spanFromMdc = MDC.get("spanId");
log.info("initSend START span.isRecording()={} tidFromMdc={} spanFromMdc={}", span.isRecording(), tidFromMdc, spanFromMdc);
TextMapSetter<TraceableMessage> setter =
new TextMapSetter<>() {
@Override
public void set(TraceableMessage carrier, String key, String value) {
// Insert the context as Header
log.info("initSend SET carrier={} key={} value={}", carrier, key, value);
carrier.setTracingContext(key, value);
carrier.setTracingAnnotation(key, value);
}
};
try (Scope scope = span.makeCurrent()) {
// log.info("initSend makeCurrent");
openTelemetry.getPropagators().getTextMapPropagator().inject(Context.current(), message, setter);
} finally {
log.info("initSend END");
span.end();
}
}
@Override
public void completeSend(TraceableMessage message, String outcome) {
// log.info("completeSend");
Object cachedSpan = message.getTracingContext(SEND_SPAN_CONTEXT_KEY);
// log.info("completeSend {}", SEND_SPAN_CONTEXT_KEY);
if (cachedSpan != null) {
log.info("completeSend cachedSpan={}", cachedSpan);
Span span = (Span) cachedSpan;
span.addEvent(EVENT, Attributes.of(AttributeKey.stringKey(EVENT), DELIVERY_SETTLED));
span.addEvent(STATE, Attributes.of(AttributeKey.stringKey(STATE), outcome == null ? "null" : outcome));
span.end();
}
}
@Override
public void syncReceive(TraceableMessage message, String address, DeliveryOutcome outcome) {
// log.info("syncReceive");
TextMapGetter<TraceableMessage> getter =
new TextMapGetter<>() {
@Override
public String get(TraceableMessage carrier, String key) {
log.info("syncReceive key={} carrier={}", key, carrier.toString());
Object stringProperty = carrier.getTracingAnnotation(key);
log.info("syncReceive stringProperty value={} class={}", stringProperty, stringProperty.getClass());
return isEmpty(stringProperty) ? "stringProperty" : stringProperty.toString();
}
@Override
public Iterable<String> keys(TraceableMessage carrier) {
log.info("syncReceive carrier {}", carrier.toString());
log.info("syncReceive stringProperty lista");
return List.of();
}
};
Context extractedContext = openTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), message, getter);
// try (Scope scope = extractedContext.makeCurrent()) { // Automatically use the extracted SpanContext as parent.
Scope scope = extractedContext.makeCurrent(); // Automatically use the extracted SpanContext as parent.
Span serverSpan = tracer.spanBuilder(RECEIVE_SPAN_NAME)
.setParent(extractedContext)
.setSpanKind(SpanKind.SERVER)
.startSpan();
// try {
serverSpan.setAttribute(SemanticAttributes.HTTP_USER_AGENT, "valid! user agent");
// } finally {
serverSpan.end();
String tidFromMdc = MDC.get("traceId");
String spanFromMdc = MDC.get("spanId");
String tidFromMdcB3 = MDC.get("X-B3-TraceId");
String spanFromMdcB3 = MDC.get("X-B3-SpanId");
log.info("syncReceive END tidFromMdc={} spanFromMdc={} tidFromMdcB3={} spanFromMdcB3={}", tidFromMdc, spanFromMdc, tidFromMdcB3, spanFromMdcB3);
// }
// }
}
private void addDeliveryLogIfNeeded(DeliveryOutcome outcome, Span span) {
log.info("addDeliveryLogIfNeeded");
if (outcome == DeliveryOutcome.EXPIRED) {
span.addEvent(EVENT, Attributes.of(AttributeKey.stringKey(EVENT), MESSAGE_EXPIRED));
log.info("addDeliveryLogIfNeeded: {}", DeliveryOutcome.EXPIRED);
} else if (outcome == DeliveryOutcome.REDELIVERIES_EXCEEDED) {
span.addEvent(EVENT, Attributes.of(AttributeKey.stringKey(EVENT), REDELIVERIES_EXCEEDED));
log.info("addDeliveryLogIfNeeded: {}", DeliveryOutcome.REDELIVERIES_EXCEEDED);
}
}
@Override
public void asyncDeliveryInit(TraceableMessage message, String address) {
log.info("asyncDeliveryInit");
Span span = tracer.spanBuilder(SEND_SPAN_NAME).setSpanKind(SpanKind.CLIENT)
.setAttribute(Tags.SPAN_KIND, Tags.SPAN_KIND_PRODUCER)
.setAttribute(Tags.MESSAGE_BUS_DESTINATION, address)
.setAttribute(Tags.COMPONENT, COMPONENT)
.startSpan();
log.info("asyncDeliveryInit {}", span.isRecording());
TextMapSetter<TraceableMessage> setter =
new TextMapSetter<>() {
@Override
public void set(TraceableMessage carrier, String key, String value) {
// Insert the context as Header
log.info("asyncDeliveryInit carrier {}", carrier);
log.info("asyncDeliveryInit key {}", key);
log.info("asyncDeliveryInit value {}", value);
carrier.setTracingContext(key, value);
carrier.setTracingAnnotation(key, value);
}
};
try (Scope scope = span.makeCurrent()) {
log.info("asyncDeliveryInit makeCurrent");
openTelemetry.getPropagators().getTextMapPropagator().inject(Context.current(), message, setter);
} finally {
log.info("asyncDeliveryInit end");
span.end();
}
}
@Override
public void asyncDeliveryComplete(TraceableMessage message, DeliveryOutcome outcome, Throwable throwable) {
Scope scope = (Scope) message.removeTracingContext(ONMESSAGE_SCOPE_CONTEXT_KEY);
log.info("asyncDeliveryComplete scope {}", scope);
try {
if (scope != null) {
log.info("asyncDeliveryComplete scope!=null");
scope.close();
}
} finally {
Span span = (Span) message.getTracingContext(DELIVERY_SPAN_CONTEXT_KEY);
log.info("asyncDeliveryComplete span != null {}", span != null);
if (span != null) {
try {
if (throwable != null) {
span.setStatus(StatusCode.ERROR, "Application error, exception thrown from onMessage.");
span.recordException(throwable);
log.info("asyncDeliveryComplete throwable != null");
} else {
addDeliveryLogIfNeeded(outcome, span);
}
} finally {
span.end();
}
}
}
}
@Override
public void close() {
log.info("opentelemtry close()");
// NOOP
}
}
The super WEIRD thing is when I try to autowire the Tracer object in the beanDefinitionRegistryPostProcessor
method the whole Spring Boot tracing goes dead :/. I don't need to use the object even... just Autowire it and boom. No REST tracing. No JDBC tracing. and no JMSTracing... obviously. Still I get zero errors so I don't know what I'm doing wrong.
Also I don't see any JMS examples in the samples https://github.com/spring-cloud-samples/spring-cloud-sleuth-samples/
Without com.github.gavlyukovskiy:p6spy-spring-boot-starter I don't see the JDBC tracing. Why do you say it's not needed?
You can check this section of the docs https://docs.spring.io/spring-cloud-sleuth/docs/current/reference/html/integrations.html#sleuth-jdbc-integration
The super WEIRD thing is when I try to autowire the Tracer object in the beanDefinitionRegistryPostProcessor method the whole Spring Boot tracing goes dead :/. I don't need to use the object even... just Autowire it and boom. No REST tracing. No JDBC tracing. and no JMSTracing... obviously. Still I get zero errors so I don't know what I'm doing wrong.
That's because you're trying to inject it too early and there's a deadlock. You should not be injecting such a bean there. You should try to resolve it at runtime.
We’re decorating
DataSource
s in a trace representation. We delegate actual proxying to either p6spy or datasource-proxy. In order to use this feature you need to have them on the classpath.
it says you need one of those on the classpath. You mean it's a different artifact right?
That's because you're trying to inject it too early and there's a deadlock. You should not be injecting such a bean there. You should try to resolve it at runtime.
That's what we did yesterday and it works now. Thanks. I will look for more explanation later on ;)
@marcingrzejszczak what about JMS tracing in general? What is supported by Spring right now with JMS? Brave? OpenTracing? OpenTelemetry? where can I see the codes for that instrumentation?
it says you need one of those on the classpath. You mean it's a different artifact right?
You don't need com.github.gavlyukovskiy:p6spy-spring-boot-starter
, you need either p6spy or datasource-proxy
That's what we did yesterday and it works now. Thanks. I will look for more explanation later on ;)
Great to hear that!
@marcingrzejszczak what about JMS tracing in general? What is supported by Spring right now with JMS? Brave? OpenTracing? OpenTelemetry? where can I see the codes for that instrumentation?
With Sleuth we support spring-jms
over Brave or via Spring Integration.
Is there a plan to adopt spring-jms
to OpenTelemetry?
Not for spring-cloud-sleuth-otel
. We will consider adding it natively with Spring Framework 6.0
Describe the bug I have two microservices in Spring Boot with nearly identical configuration as far as tracing goes but only one of them actually works (sends traces)
Here's a part of the dependence tree for spring
here's the same for opentrelemetry
here's the corresponding pom section
here's the bootstrap.yml the slueth section
Here's the working service (actually two of them: merchant-service and payment-method-service)
The other one (payment-service) just doesn't send spans.
I see my Istio proxy spans for both but only one of them is working with Spring.
In general I did the same configuration for 4 services ... 2 of them work ok ... 2 of them don't.
How to debug this situation?
I tried setting
but most of the beans within sleuth don't log anything when exporting spans or I can't find that.
BTW... when I use the agent approach like here https://signoz.io/opentelemetry/java-agent/ it's working ok... I see the spans from HTTP calls and Repository operations but TIDs between Istio and Spring get mixed up so it's not ideal either. I would prefer to do this 100% in Spring anyway.
Please advise how to successfully debug this situation. thanks!