spring-cloud / spring-cloud-consul

Spring Cloud Consul
http://cloud.spring.io/spring-cloud-consul/
Apache License 2.0
805 stars 543 forks source link

Service Discovery not working when using WebClient with Consul #821

Closed LucasDesenv closed 1 year ago

LucasDesenv commented 1 year ago

Describe the bug Hello everyone!

We're migrating the Spring Boot from 2 to 3 and also getting rid off Netflix Ribbon. We're experiencing an issue while trying to discover a service using Consul. If we rollback to Spring Boot 2 + Netflix Ribbon, it works with no problem, so we discarded any connectivity issue.

Logging errors:

RoundRobinLoadBalancer|No servers available for service: cachedavailability-integrations-service
ReactorLoadBalancerExchangeFilterFunction|LoadBalancer does not contain an instance for the service cachedavailability-integrations-service
Communication error with uri: http://cachedavailability-integrations-service/testing org.springframework.web.reactive.function.client.WebClientResponseException$ServiceUnavailable: 503 Service Unavailable from UNKNOWN 
    at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:336)
    Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below: 
Error has been observed at the following site(s):
    *__checkpoint ⇢ 503 SERVICE_UNAVAILABLE from GET http://cachedavailability-integrations-service/testing [DefaultWebClient]

Consul interface image

  1. The service "si-manager" is the one trying to discover and access the "cachedavailability" one.
  2. It is registering well, but not discovering the other services registered when using WebClient bean.

We have already tried many ways, such as:

  1. https://docs.spring.io/spring-cloud-consul/docs/current/reference/html/#using-the-discoveryclient
  2. https://www.appsloveworld.com/springboot/100/7/service-discovery-with-spring-webflux-webclient
  3. https://stackoverflow.com/questions/66534708/configuring-spring-cloud-loadbalancer-without-autoconfiguration

Sample

Main Class

@org.springframework.cloud.client.discovery.EnableDiscoveryClient
public class MainApplication {...}

WebClient config

  @Bean(name = "webClientConsulAvailability")
  public WebClient webClientConsulAvailability(
    WebClient.Builder webClientBuilder,
    ReactorLoadBalancerExchangeFilterFunction lbFunction,
    ExchangeFilterFunction logFilter
  ) {
    return webClientBuilder
      .filter(lbFunction)
      .filter(logFilter)
      .build();
  }

bootstrap.yml

spring:
  application:
    name: si-manager-service
  profiles:
    active: ${SPRING_PROFILES_ACTIVE:local}
  cloud:
    consul:
      host: localhost
      port: 8500
      enabled: true
      discovery:
        serviceName: ${spring.application.name}
        instanceId: ${spring.application.name}8500
        enabled: true
        # Register as a service in consul.
        register: true
        registerHealthCheck: true

Dependencies image

Usage example:

webClientConsulAvailability.get()
      .uri("http://cachedavailability-integrations-service/testing")
      .retrieve()
      .bodyToFlux(MyDTO.class)
      .doOnError(e -> {
        if (isErrorLogLevel(e)) {
          log.error(COMMUNICATION_ERROR_WITH_URI + uri, e);
        } else {
          log.warn(COMMUNICATION_ERROR_WITH_URI + uri, e);
        }
      })
      .onErrorResume(e -> Flux.empty());
LucasDesenv commented 1 year ago

Fixed with code below.

package xpto;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.reactive.DeferringLoadBalancerExchangeFilterFunction;
import org.springframework.cloud.client.loadbalancer.reactive.LoadBalancedExchangeFilterFunction;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.netty.http.client.HttpClient;

import java.util.concurrent.TimeUnit;

@Configuration
@EnableDiscoveryClient
@Slf4j
public class WebclientConfiguration {

  private final ObjectMapper objectMapper;
  @Value("${web.client.read-timeout:25000}")
  private final int webClientReadTimeout;
  @Value("${web.client.connection-timeout:3000}")
  private final int webClientConnectionTimeout;

  public WebclientConfiguration(ObjectMapper objectMapper,
    @Value("${web.client.read-timeout:25000}") int webClientReadTimeout,
    @Value("${web.client.connection-timeout:3000}") int webClientConnectionTimeout) {
    this.objectMapper = objectMapper;
    this.webClientReadTimeout = webClientReadTimeout;
    this.webClientConnectionTimeout = webClientConnectionTimeout;
  }

  private ClientHttpConnector getClientHttpConnector() {
    return new ReactorClientHttpConnector(
      HttpClient.create().compress(true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, webClientConnectionTimeout)
        .doOnConnected(conn -> conn.addHandlerLast(new ReadTimeoutHandler(webClientReadTimeout, TimeUnit.MILLISECONDS))));
  }

  @Bean
  public DefaultUriBuilderFactory builderFactory() {
    DefaultUriBuilderFactory factory = new DefaultUriBuilderFactory();
    factory.setEncodingMode(DefaultUriBuilderFactory.EncodingMode.VALUES_ONLY);
    return factory;
  }

  @Bean(name = "webClientConsul")
  public WebClient webClientConsul(
    WebClient.Builder webClientBuilder,
    DeferringLoadBalancerExchangeFilterFunction<LoadBalancedExchangeFilterFunction> exchangeFilterFunction
  ) {
    webClientBuilder.filter(exchangeFilterFunction);
    return buildWebClient(webClientBuilder);
  }

  @Bean(name = "webClientDefault")
  public WebClient webClientDefault(WebClient.Builder webClientBuilder) {
    return buildWebClient(webClientBuilder);
  }

  private WebClient buildWebClient(WebClient.Builder webClientBuilder) {
    ClientHttpConnector connector = getClientHttpConnector();
    return webClientBuilder
      .clientConnector(connector)
      .exchangeStrategies(getExchangeStrategies())
      .build();
  }

  private ExchangeStrategies getExchangeStrategies() {
    return  ExchangeStrategies.builder()
      .codecs(clientDefaultCodecsConfigurer -> {
        clientDefaultCodecsConfigurer
          .defaultCodecs()
          .jackson2JsonEncoder(
            new Jackson2JsonEncoder(objectMapper, MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_STREAM_JSON));
        clientDefaultCodecsConfigurer
          .defaultCodecs()
          .jackson2JsonDecoder(
            new Jackson2JsonDecoder(objectMapper, MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_STREAM_JSON));
      }).build();
  }

}