fabric8io / mockwebserver

An extension of okhttp's mockwebserver, that provides a DSL and is easier to use
Apache License 2.0
113 stars 38 forks source link

WebSocketClientHandshakeException on expect().andUpgradeToWebSocket() #77

Closed Donnerbart closed 9 months ago

Donnerbart commented 1 year ago

We're writing a custom operator using Quarkus and the fabric8 kubernetes-client. When writing a unit test with mocking for a pods/exec request I ran into this WebSocketClientHandshakeException:

java.util.concurrent.CompletionException: io.netty.handler.codec.http.websocketx.WebSocketClientHandshakeException: Invalid subprotocol. Actual: null. Expected one of: v4.channel.k8s.io
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347)
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1141)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
    at io.fabric8.kubernetes.client.vertx.VertxHttpClient.lambda$buildWebSocketDirect$3(VertxHttpClient.java:128)
    at io.vertx.core.impl.future.FutureImpl$2.onFailure(FutureImpl.java:117)
    at io.vertx.core.impl.future.FutureImpl$ListenerArray.onFailure(FutureImpl.java:268)
    at io.vertx.core.impl.future.FutureBase.lambda$emitFailure$1(FutureBase.java:69)
    at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:86)
    at io.vertx.core.impl.ContextBase.execute(ContextBase.java:225)
    at io.vertx.core.impl.future.FutureBase.emitFailure(FutureBase.java:66)
    at io.vertx.core.impl.future.FutureImpl.tryFail(FutureImpl.java:230)
    at io.vertx.core.impl.future.PromiseImpl.tryFail(PromiseImpl.java:23)
    at io.vertx.core.Promise.fail(Promise.java:89)
    at io.vertx.core.http.impl.Http1xClientConnection.lambda$toWebSocket$10(Http1xClientConnection.java:1018)
    at io.vertx.core.http.impl.WebSocketHandshakeInboundHandler.lambda$handshakeComplete$0(WebSocketHandshakeInboundHandler.java:117)
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
    at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:557)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
    at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:185)
    at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95)
    at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30)
    at io.vertx.core.http.impl.WebSocketHandshakeInboundHandler.handshakeComplete(WebSocketHandshakeInboundHandler.java:105)
    at io.vertx.core.http.impl.WebSocketHandshakeInboundHandler.channelRead(WebSocketHandshakeInboundHandler.java:84)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: io.netty.handler.codec.http.websocketx.WebSocketClientHandshakeException: Invalid subprotocol. Actual: null. Expected one of: v4.channel.k8s.io
    at io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker.finishHandshake(WebSocketClientHandshaker.java:389)
    at io.vertx.core.http.impl.WebSocketHandshakeInboundHandler.lambda$handshakeComplete$0(WebSocketHandshakeInboundHandler.java:109)
    ... 31 common frames omitted
    Suppressed: java.lang.Throwable: waiting here
        at io.fabric8.kubernetes.client.utils.Utils.waitUntilReady(Utils.java:174)
        at io.fabric8.kubernetes.client.utils.Utils.waitUntilReadyOrFail(Utils.java:185)
        at io.fabric8.kubernetes.client.dsl.internal.core.v1.PodOperationsImpl.setupConnectionToPod(PodOperationsImpl.java:389)
        at io.fabric8.kubernetes.client.dsl.internal.core.v1.PodOperationsImpl.exec(PodOperationsImpl.java:296)

This is our mock code:

mockServer.expect()
        .get()
        .withPath(String.format(
                "/api/v1/namespaces/%s/pods/%s/exec?command=%s&container=%s&stdout=true&stderr=true",
                namespace,
                podName,
                command,
                containerName))
        .andUpgradeToWebSocket()
        .open(new OutputStreamMessage(result))
        .done()
        .once();                

This is our production code:

client.pods()
    .inNamespace(namespace)
    .withName(podName)
    .inContainer(containerName)
    .writingOutput(output)
    .writingError(error)
    .usingListener(listener)
    .exec(command)

The subprotocol mentioned in the exception is set by the kubernetes-client in PodOperationsImpl:

  private ExecWebSocketListener setupConnectionToPod(URI uri) {
    ExecWebSocketListener execWebSocketListener = new ExecWebSocketListener(getContext(), this.context.getExecutor(),
        this.getKubernetesSerialization());
    CompletableFuture<WebSocket> startedFuture = httpClient.newWebSocketBuilder()
        .subprotocol("v4.channel.k8s.io")
        .uri(uri)
        .connectTimeout(getRequestConfig().getRequestTimeout(), TimeUnit.MILLISECONDS)
        .buildAsync(execWebSocketListener);
    startedFuture.whenComplete((w, t) -> {
      if (t != null) {
        execWebSocketListener.onError(w, t);
      }
    });
    Utils.waitUntilReadyOrFail(startedFuture, getRequestConfig().getRequestTimeout(), TimeUnit.MILLISECONDS);
    return execWebSocketListener;
  }

And the exception is triggered by io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker.finishHandshake():

        // Verify the subprotocol that we received from the server.
        // This must be one of our expected subprotocols - or null/empty if we didn't want to speak a subprotocol
        String receivedProtocol = response.headers().get(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL);
        receivedProtocol = receivedProtocol != null ? receivedProtocol.trim() : null;
        String expectedProtocol = expectedSubprotocol != null ? expectedSubprotocol : "";
        boolean protocolValid = false;

        if (expectedProtocol.isEmpty() && receivedProtocol == null) {
            // No subprotocol required and none received
            protocolValid = true;
            setActualSubprotocol(expectedSubprotocol); // null or "" - we echo what the user requested
        } else if (!expectedProtocol.isEmpty() && receivedProtocol != null && !receivedProtocol.isEmpty()) {
            // We require a subprotocol and received one -> verify it
            for (String protocol : expectedProtocol.split(",")) {
                if (protocol.trim().equals(receivedProtocol)) {
                    protocolValid = true;
                    setActualSubprotocol(receivedProtocol);
                    break;
                }
            }
        } // else mixed cases - which are all errors

My workaround for this is to set the missing header myself:

mockServer.expect()
        .get()
        .withPath(String.format(
                "/api/v1/namespaces/%s/pods/%s/exec?command=%s&container=%s&stdout=true&stderr=true",
                namespace,
                podName,
                command,
                containerName))
        .andUpgradeToWebSocket()
        .open(new OutputStreamMessage(result))
        .done()
        .withHeader(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL.toString(), "v4.channel.k8s.io")
        .once();                

I think the mock server should handle this automatically, since it's defined in the protocol upgrade mechanism (and not specific to the kubernetes-client usage). See https://developer.mozilla.org/en-US/docs/Web/HTTP/Protocol_upgrade_mechanism