reactor / reactor-netty

TCP/HTTP/UDP/QUIC client/server with Reactor over Netty
https://projectreactor.io
Apache License 2.0
2.58k stars 647 forks source link

Exception in a Flux created by a subscriber will be ignored by reactor-netty #362

Open dantesun opened 6 years ago

dantesun commented 6 years ago

Following link https://pivotal.io/security should be used to report security related issues

Expected behavior

Retry works

Actual behavior

Exception ignored by FluxReceive

Steps to reproduce

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import java.io.IOException;
import java.time.Duration;
import java.util.logging.Level;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SignalType;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.http.client.HttpClient;
import reactor.ipc.netty.resources.LoopResources;
import reactor.retry.Retry;

public class PollingTest {

  private static final Logger logger = LoggerFactory.getLogger(PollingTest.class);

  private Flux<Integer> createFlux() {
    return Flux.just(1, 2, 3, 4, 5, 6, 7);
  }

  @Test
  public void polling() throws InterruptedException {
    HttpClient client =
        HttpClient.builder()
            .options(
                builder -> {
                  LoopResources channelResources = LoopResources.create("my-loop", 2, false);
                  builder.loopResources(channelResources);
                  builder.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000);
                  builder.disablePool();
                })
            .build();

    Retry<Object> ioError =
        Retry.anyOf(IOException.class)
            .retryMax(Integer.MAX_VALUE)
            .fixedBackoff(Duration.ofSeconds(1));

    client
        .get(
            "http://releases.ubuntu.com/16.04.4/ubuntu-16.04.4-desktop-amd64.iso",
            request -> {
              request
                  .context()
                  .addHandlerFirst(new IdleStateHandler(1, 0, 0))
                  .addHandlerLast(
                      new ChannelDuplexHandler() {
                        @Override
                        public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
                            throws Exception {
                          if (evt instanceof IdleStateEvent) {
                            ctx.close();
                          } else {
                            super.userEventTriggered(ctx, evt);
                          }
                        }
                      });
              return request.send();
            })
        .doOnError(e -> logger.error("Error!", e))
        .flatMapMany(NettyInbound::receive)
        .log(logger.getName(), Level.SEVERE, SignalType.ON_ERROR, SignalType.CANCEL)
        .retryWhen(ioError)
        .subscribe(
            byteBuf -> {
              logger.info("Msg: {}", byteBuf);
              Flux.just(1,2,3)
                  .onErrorReturn(8)
                  .subscribe(
                      i -> {
                          throw new RuntimeException(i + " error");
                      });
            });
    Thread.sleep(Duration.ofMinutes(5).toMillis())

Reactor Netty version

0.7.7.RELEASE

JVM version (e.g. java -version)

java version "1.8.0_144" Java(TM) SE Runtime Environment (build 1.8.0_144-b01) Java HotSpot(TM) 64-Bit Server VM (build 25.144-b01, mixed mode)

OS version (e.g. uname -a)

Darwin 15.6.0 Darwin Kernel Version 15.6.0: Tue Jan 30 11:45:51 PST 2018; root:xnu-3248.73.8~1/RELEASE_X86_64 x86_64

smaldini commented 6 years ago

Hey @dantesun , throwing in the subscribe might not work here because there is no more handler downstream to pass the error to (and it will bubble up internally due to CallbackNotImplemented in subscribe). The only thing we can do at least in 0.8 is to pass that error to a generic error consumer assigned to the client, but it won't be passed back in the pipeline. That would look like this

HttpClient client = HttpClient.newConnection()
                      .tcpConfiguration(tcp -> tcp.runOn(LoopResources.create("my-loop", 2, false))
                                                                  .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000));

        client.doOnError(e -> logger.error(e))
                      .doOnRequest((req, c) -> c.addHandlerFirst(new IdleStateHandler(1, 0, 0))
                                        .addHandlerLast(new ChannelDuplexHandler() {
                                            @Override
                                            public void userEventTriggered(
                                                    ChannelHandlerContext ctx,
                                                    Object evt) throws Exception {
                                                if (evt instanceof IdleStateEvent) {
                                                    ctx.close();
                                                }
                                                else {
                                                    super.userEventTriggered(ctx, evt);
                                                }
                                            }
                                        }))
              .get()
              .uri("http://releases.ubuntu.com/16.04.4/ubuntu-16.04.4-desktop-amd64.iso")
              .responseContent()
              .log(logger.getName())
              .retry(IOException.class::isInstance)
              .subscribe(byteBuf -> {
                  logger.info("Msg: {}", byteBuf);
                  Flux.just(1, 2, 3)
                      .onErrorReturn(8)
                      .subscribe(i -> {
                          throw new RuntimeException(i + " error");
                      });
              });