spring-projects / spring-framework

Spring Framework
https://spring.io/projects/spring-framework
Apache License 2.0
56.32k stars 38.01k forks source link

WebTestClient gets stuck on exchange when asserting infinite flux #30516

Closed mvol closed 1 year ago

mvol commented 1 year ago

Affects: 6.0.8

Problem:

When trying to assert a controller-method returning an infinite flux of server-sent-events, WebTestClient gets stuck on exchange.

Expected behaviour: StepVerifier asserts first three items, then cancels the subscription, test finishes successfully. Actual behaviour: WebTestClient gets stuck on exchange - expectStatus is never executed, test never finishes.

Example:

@RestController
class TestController {

    @GetMapping(path = "/test", produces = TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<Long>> controllerMethod() {
            return Flux.interval(Duration.ofMillis(100))
                    .map(i -> ServerSentEvent.builder(i).build());
    }
}
@SpringBootTest
@AutoConfigureMockMvc
@AutoConfigureWebTestClient
class DeviceControllerApiTest {

    @Autowired private WebTestClient webClient;

    @Test
    void testStream() {
        Flux<Long> result = webClient.get()
                .uri("/test")
                .exchange()
                .expectStatus().isOk()
                .returnResult(Long.class)
                .getResponseBody();

        StepVerifier.create(result)
                .expectNextSequence(List.of(0L, 1L, 2L))
                .thenCancel()
                .verify(Duration.ofMillis(400));
    }
}
mvol commented 1 year ago

The problem apparently only occurs when org.springframework.boot:spring-boot-starter-web is loaded as well - only org.springframework.boot:spring-boot-starter-webflux works fine.

mvol commented 1 year ago

I also noticed that expectHeader() does not work in case of an infinite flux, failing with a java.lang.IllegalStateException: Timeout on blocking read for 5000000000 NANOSECONDS - is that intended?

rstoyanchev commented 1 year ago

Can you provide a sample please?

spring-projects-issues commented 1 year ago

If you would like us to look at this issue, please provide the requested information. If the information is not provided within the next 7 days this issue will be closed.

spring-projects-issues commented 1 year ago

Closing due to lack of requested feedback. If you would like us to look at this issue, please provide the requested information and we will re-open the issue.

rodrigorodrigues commented 8 months ago

Hi @mvol have you managed to fix this issue? I'm facing the same seems getting stuck on WebTestClient when using Flux.interval using latest Spring Boot 3.2.0

image

image

eiswind commented 7 months ago

I see the same issue here, when testing an mvc controller with the WebTestClient.

@GetMapping(path = "/updates", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter createEmitter() {
    SseEmitter emitter = new SseEmitter(-1L);
    this.emitters.add(emitter);
    emitter.onCompletion(() -> this.emitters.remove(emitter));
    emitter.onTimeout(() -> {
        emitter.complete();
        this.emitters.remove(emitter);
    });
    return emitter;
}

Setup is SpringBootTest with WebEnvironment

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@ActiveProfiles("test") 
public class MessageControllerIntegrationTest {
    @Autowired
    WebTestClient client;

Test gets stuck with timeout in exchange:

var updateStream = client
      .get()
      .uri("/api/messages/updates")
      .headers(httpHeaders -> httpHeaders.setBearerAuth(jwtAuth))
      .accept(MediaType.TEXT_EVENT_STREAM)
      .exchange()
      .expectStatus().isOk()
      .expectHeader().contentTypeCompatibleWith(MediaType.TEXT_EVENT_STREAM)
      .returnResult(ServerSentEvent.class)
      .getResponseBody();

Let me know if I can provide further help. I hope to see this isse re-opened.

eiswind commented 7 months ago

I just created a MRE at https://github.com/eiswind/webtestclient-sse

eiswind commented 7 months ago

@rstoyanchev could you consider re-opening this?

fabiofranco85 commented 5 months ago

Facing the exact same problem. Did anybody find a suitable solution? Spring Boot 3.2.4

eiswind commented 5 months ago

I think this is quite un-fixable as WebTestClient calls block() internally while there is no event yet.

I created myself a workaround using the standard WebFlux WebClient, works like a charm.

rathore0201 commented 3 months ago

@eiswind Can you please share the workaround? I am also facing the same issue after springboot uprade.

rstoyanchev commented 3 months ago

There is a limitation in using WebTestClient with MockMvc as a server to test infinite streams that has now been documented, see https://github.com/spring-projects/spring-framework/issues/32687#issuecomment-2082344903 and the documentation page.

WebTestClient should work for infinite streams as expected with a WebFlux server, allowing you to cancel the stream when you cancel the result Flux from the test.

For infinite streams with Spring MVC, you'll need a running server, i.e. WebTestClient.bindToServer() or as described here for Boot applications. You can still use WebTestClient with MockMvc for all other endpoints where the controller completes the response on its own, which allows using a consistent test API across all endpoints, but for infinite streams the test will have to run the server.

eiswind commented 3 months ago

The issue at my side was that my SSE Endpoint does send the first event only after a certain trigger happens. When I use WebTestClient, it blocks internally waiting for a response, at least as far as I understand it. This prevents triggering the SSE Event before the WebTestClient goes into a timeout. I solved it using the regular WebClient for the SSE Endpoint.

ParameterizedTypeReference<ServerSentEvent<MessagePushNotification>> type = new ParameterizedTypeReference<>() {
};

var subscription = webClientBuilder.baseUrl("http://localhost:" + port).build()
        .get()
        .uri("/api/messages/updates")
        .headers(httpHeaders -> httpHeaders.setBearerAuth(jwtAuth))
        .retrieve().bodyToFlux(type).subscribe(event -> {
            if (event.data().type() == MessagePushNotificationType.ADD) {
                eventCount.addAndGet(1);
            }
        });
rstoyanchev commented 3 months ago

That's essentially the same as using WebTestClient against a live server, but keeping the test API.

eiswind commented 3 months ago

I just tried once again like:


ParameterizedTypeReference<ServerSentEvent<MessagePushNotification>> type = new ParameterizedTypeReference<>() {
};

var result = webTestClient
        .get()
        .uri("/api/messages/updates")
        .headers(httpHeaders -> httpHeaders.setBearerAuth(jwtAuth))
        .exchange().returnResult(type);

var subscription = result.getResponseBody().subscribe(event -> {
    if (event.data().type() == MessagePushNotificationType.ADD) {
        eventCount.addAndGet(1);
    }
});

That still gives me a timeout on the exchange. Maybe I don't get what is meant by a live-server? My test goes like

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@ActiveProfiles("test") 
public class MessageControllerIntegrationTest {

    @Autowired
    WebTestClient client;
rstoyanchev commented 3 months ago

Yes that should be using a live server. I would use StepVerifier instead of subscribing, but aside from that maybe there is something specific about the scenario. Not sure what that trigger is that you mentioned for the server to start sending events?

eiswind commented 3 months ago

The SSE endpoint in the example does NOT sent an event until we make another (different endpoint) request like:

@PostMapping(produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<Message> saveMessage(
        @RequestBody @Valid CreateOrEditMessage coe
) {

    var clonedMessage = new Message(coe.likes(), coe.text(), LocalDateTime.now().truncatedTo(ChronoUnit.SECONDS), null);
    var savedMessage = messageRepository.save(clonedMessage);
    sendUpdateNotification(new MessagePushNotification(MessagePushNotificationType.ADD, savedMessage));
    var uri = URI.create("/api/messages/" + savedMessage.id());
    return ResponseEntity.created(uri).body(savedMessage);
}

I updated the reproducer from above. Here you can see that the WebTestClient blocks into a timeout.

https://github.com/eiswind/webtestclient-sse

rstoyanchev commented 3 months ago

There are two things.

In the referenced example, you don't actually emit any events at all, but generally for an infinite stream, the server does not complete the response on its own. You need to do so from the client side after consuming events that you are interested in with StepVerifier. See for example this test, which reads 5 events and then cancels. That in turn closes the connection to the server.

For the specific scenario you mention where you need to start the stream, then make a separate HTTP post, WebTestClient itself is synchronous for convenience as a testing API. Therefore it blocks the main thread, and you cannot make a separate HTTP POST. You could do that asynchronously without much extra effort. For example, rewriting the above referenced test:

@Test
void entityStream() {
    Flux<Person> personFlux = Flux.defer(() ->
                    this.client.get()
                            .accept(TEXT_EVENT_STREAM)
                            .exchange()
                            .expectStatus().isOk()
                            .expectHeader().contentTypeCompatibleWith(TEXT_EVENT_STREAM)
                            .returnResult(Person.class)
                            .getResponseBody()
            )
            .subscribeOn(Schedulers.boundedElastic());

    StepVerifier.create(personFlux)
            .then(() -> {
                // Make HTTP Post
            })
            .expectNext(new Person("N0"), new Person("N1"), new Person("N2"))
            .expectNextCount(4)
            .consumeNextWith(person -> assertThat(person.getName()).endsWith("7"))
            .thenCancel()
            .verify();
}

That said, as a scenario with multiple HTTP calls, perhaps WebClient is a good fit.

eiswind commented 3 months ago

@rstoyanchev Thanks for taking your time and clearing things up a little bit. The unexcpected thing here to me was that it's synchronous and blocking even if I return a Flux. Using the WebClient allows me to do what I need and this ticket should give enough hints to others how might follow up.

rstoyanchev commented 3 months ago

Fair point, and you generally do get the Flux before the content has materialized, but the server doesn't actually commit the response immediately, so effectively you end up waiting for the response status and headers. Alternatively, if the API was not synchronous, you'd have to block on a Mono to assert the response status and headers.