Closed spring-projects-issues closed 6 years ago
Rossen Stoyanchev commented
The exchange()
method blocks only to get the ClientResponse
which contains the status and headers, but the body is yet to be read via one of the body methods on the response. Here is one streaming test. The response content is not yet read when the status and headers are asserted, not even when FluxExchangeResult
is returned. It's only in the StepVerifier that the actual content is consumed.
It's true that a retrieve method next to exchange in WebTestClient could be used as a kind of shortcut, if you don't want to assert the status and go straight to the response content, but for that it should cover all options like WebClient does with the ResponseSpec it returns.
Rossen Stoyanchev commented
Resolving for now.
Rossen Stoyanchev commented
Here is one streaming test.
I see two problems here. The linked test class doesn't test the streaming method 'Flux
And even then, as this issue states, the exchange method blocks forever in some circumstance, one of them being returning an processor in a controller method with no events happened. Everything works well if the stream immediately returns some data. For now I did not found any help and am working with 'warmup' events in my tests.
I do not think, the issue is resolved after all.
The linked test class doesn't test the streaming method
Flux getPersonStream()
.
I just verified with a debug point that it does call Flux<Person> getPersonStream()
, so I'm not sure what you mean.
even then, as this issue states, the exchange method blocks forever in some circumstance
As I mentioned before it only blocks for the status and headers, and not for the body, so I don't follow this either. Please provide a sample to demonstrate the issue.
Example for blocking WebTestClient:
RestController:
package com.example;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.ReplayProcessor;
@RestController
public class ExampleController {
private final FluxSink<ServerSentEvent<String>> dataSink;
private final ReplayProcessor<ServerSentEvent<String>> processor;
public ExampleController() {
this.processor = ReplayProcessor.create(100, false);
this.dataSink = this.processor.sink();
}
@PutMapping("/updateExample")
public void update(String content) {
this.dataSink.next(
ServerSentEvent.<String>builder()
.id("id")
.event("message")
.data(content)
.build());
}
@GetMapping("/subscribeExample")
public Flux<ServerSentEvent<String>> subscribe(String subscriptionID) {
return this.processor;
}
}
TestCode:
package com.example;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.test.web.reactive.server.FluxExchangeResult;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.test.StepVerifier;
import java.time.Duration;
@WebFluxTest(controllers = ExampleController.class)
public class ExampleTests {
@Autowired
private WebTestClient webClient;
private static final ParameterizedTypeReference<ServerSentEvent<String>> typeRef = new ParameterizedTypeReference<>() {};
@Test
public void subscribeTest() {
// uncomment next line to make the test pass
// webClient.put().uri("/updateExample").body(BodyInserters.fromObject("")).exchange().expectStatus().isOk();
FluxExchangeResult<ServerSentEvent<String>> result = webClient.get().uri("/subscribeExample")
.accept(MediaType.valueOf(MediaType.TEXT_EVENT_STREAM_VALUE))
.exchange()
.expectStatus().isOk()
.expectHeader().contentTypeCompatibleWith(MediaType.TEXT_EVENT_STREAM)
.returnResult(typeRef);
StepVerifier.create(result.getResponseBody())
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(5))
.thenCancel()
.verify();
}
}
This test leads to the blocking timeout exception. If you uncomment the webClient.put() call, everything works. If that behaviour is intended, do you have any advise where to find detailed documentation for how to use WebTestClient with SSE?
Is there any chance for this being worked on? SSE subscribes with returned (processor)flux that doesn't have data yet seems a valid use case (at least it's a pattern we at work use quite often) for me. The problem is that there won't be any response if the flux has no data, so that exchange() runs into timeouts.
@MderM, so the server is not producing any data to start and the test client eventually times out? You can change the responseTimeout
on WebTestClient.Builder
. By default it is set to 5 seconds.
@rstoyanchev I tuned the timeout before, but that didn't change anything. It is not possible to check if there was no data until now (at least not without some not so nice tricks), but yeah, I found workarounds for all my testcases, but they're just that: workarounds. I seperated out business logic from the controller to a service to be able to handle and test the Flux behaviour directly.
But it would be really nice to get the Flux through the webtestclient without the exchange to do st like
StepVerifier.create(responseFlux). ... ... .testThatNoDataFlowedTillNow () .then(() -> inject data) .expectNext () .andSoOn
But yeah, I think it will work without some way or another. You just have to be careful with SSE in combination with ConnectedFlux/Processor.
Please suggest what I'm missing here.
I'm also using SSE and exchange
call is blocking the test so I never reach StepVerifier
.
I'm using Spring MVC so my REST controller returns SseEmitter
object that keeps an infinite stream, messages are streamed constantly every 1 second.
The idea is to verify the first few messages and cancel the subscription. Here's the test:
@Test
public void testSubscribeOnAlertsWithServerError()
{
var exchangeResult = webTestClient.get()
.uri("/someurl")
.exchange()
.returnResult(String.class);
// I never reach that in debug
StepVerifier.create(exchangeResult.getResponseBody())
.expectNextCount(5)
.thenCancel()
.verify();
}
BTW it works fine in similar tests where the stream is completed at some point. Probably this is not expected to work with Spring MVC, in this case, could you suggest a way to test such a scenario?
I found some other workaround. I setup a spy on the controller method. The trick is to do the regular work to trigger the flux as soon as the controller is invoked. In my scenario it is a flux returning user events and I need to invoke some other endpoint (f.e. user creation) right after connecting to the stream (= "within" exchange)
Complete example can be found here: https://stackoverflow.com/questions/61124065/how-to-test-server-sent-events-api-written-in-spring-5-webflux-when-it-is-nec/77803203#77803203
Bernd Kolb opened SPR-17248 and commented
Summary As I user I want to test a continuous, infinite stream of events. I have a controller that provides a
Flux<ServerSentEvent
.Actual Behavior
WebTestClient
does not provide api to test such a scenario.WebTestClient.returnResult
's documentation states that is should be usable in in such a scenario, however in order to call this method, I have to callexchange
first. The implementation ofexchange
internally however blocks until a timeout is reached.Expected Behavior In addition to
exchange
, there should a method similar toResponseSpec.returnResult
which can be used with a StepVerifier.Affects: 5.0.8
Referenced from: pull request https://github.com/spring-projects/spring-framework/pull/1949