envoyproxy / java-control-plane

Java implementation of an Envoy gRPC control plane
Apache License 2.0
294 stars 137 forks source link

clarify how re-subscribing to resources should work #339

Open fboranek opened 8 months ago

fboranek commented 8 months ago

The latest gRPC library does unsubscribe resource if channel is not used and then re-subscribed. This not work properly when using V3DiscoveryServer with SimpleCache<>.

The question is how it should look like protocol flow in the SotW protocol variant.

Here is whats going on: 1) subscribe resource "a" and "b" 1) unsubscribe resource "b" 1) subscribe resource "a" and "b": Here library didn't response, however the xds-client already removed "b" and expecting response. If instead of re-subscribing "b" is subscribed "c", the control-plain responds.

image

According https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#unsubscribing-from-resources

Unsubscribing From Resources

In the SotW protocol variants, each request must contain the full list of resource names being subscribed to in the resource_names field, so unsubscribing to a set of resources is done by sending a new request containing all resource names that are still being subscribed to but not containing the resource names being unsubscribed to. For example, if the client had previously been subscribed to resources A and B but wishes to unsubscribe from B, it must send a new request containing only resource A.

Looks like that gRPC confirms the specification at un-subscribe.

Acording to specification https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#how-the-client-specifies-what-resources-to-return

When the client sends a new request that changes the set of resources being requested, the server must resend any newly requested resources, even if it previously sent those resources without having been asked for them and the resources have not changed since that time.

Here i cannot tell. The resource "b" is not exactly new. It is already known resource but unsubscribed.

This unit test I used to see how it is implemented.

package cz.seznam.profile.xds;

import com.google.common.collect.ImmutableList;
import com.google.protobuf.InvalidProtocolBufferException;

import io.envoyproxy.controlplane.cache.NodeGroup;
import io.envoyproxy.controlplane.cache.v3.SimpleCache;
import io.envoyproxy.controlplane.cache.v3.Snapshot;
import io.envoyproxy.controlplane.server.V3DiscoveryServer;
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.config.core.v3.Node;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
import io.envoyproxy.envoy.config.listener.v3.Listener;
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.Secret;
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.envoyproxy.controlplane.cache.Resources;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

public class ReSubscribeTest {
    static {
        var rootLogger = org.apache.log4j.Logger.getRootLogger();
        rootLogger.setLevel(org.apache.log4j.Level.DEBUG);
        rootLogger.addAppender(new org.apache.log4j.ConsoleAppender(
                new org.apache.log4j.PatternLayout("%-6r [%p] %c - %m%n")));
    }

    public static class StaticNodeGroup implements NodeGroup<String> {
        @Override
        public String hash(Node node) {
            return node.getCluster();
        }
    }

    @BeforeAll
    public static void setup() throws Exception {
    }

    private static final String CLUSTER_NAME = "default-cluster";

    static final io.envoyproxy.envoy.config.core.v3.Node node = io.envoyproxy.envoy.config.core.v3.Node.newBuilder()
            .setCluster("default-cluster").setUserAgentName("gRPC C-core linux")
            .setUserAgentVersion("C-core 37.0.0").addClientFeatures("envoy.lb.does_not_support_overprovisioning")
            .addClientFeatures("xds.config.resource-in-sotw").build();

    static final DiscoveryRequest drListener1 = DiscoveryRequest.newBuilder()
            .setTypeUrl(Resources.V3.LISTENER_TYPE_URL).setNode(node)
            .addResourceNames("my-echo-server")
            .addResourceNames("my-echo-server2")
            .build();

    static final DiscoveryRequest drRoute1 = DiscoveryRequest.newBuilder()
            .setTypeUrl(Resources.V3.ROUTE_TYPE_URL).setNode(node)
            .addResourceNames("my-echo-server-route")
            .addResourceNames("my-echo-server2-route")
            .build();

    static final DiscoveryRequest drCluster1 = DiscoveryRequest.newBuilder()
            .setTypeUrl(Resources.V3.CLUSTER_TYPE_URL).setNode(node)
            .addResourceNames("my-echo-server-cluster")
            .addResourceNames("my-echo-server2-cluster")
            .build();

    static final DiscoveryRequest drEndpoint1 = DiscoveryRequest.newBuilder()
            .setTypeUrl(Resources.V3.ENDPOINT_TYPE_URL).setNode(node)
            .addResourceNames("my-echo-server-cluster")
            .addResourceNames("my-echo-server2-cluster")
            .build();

    private static final String VERSION1 = "88eb5ecd-1e5f-484f-9522-d5be08dd166a";

    @SuppressWarnings("null")
    private static final Snapshot SNAPSHOT1 = Snapshot.create(
            ImmutableList.of(Cluster.newBuilder().setName("my-echo-server-cluster").build(),
                    Cluster.newBuilder().setName("my-echo-server2-cluster").build(),
                    Cluster.newBuilder().setName("my-echo-server3-cluster").build()),
            ImmutableList.of(ClusterLoadAssignment.newBuilder().setClusterName("my-echo-server-cluster").build(),
                    ClusterLoadAssignment.newBuilder().setClusterName("my-echo-server2-cluster").build(),
                    ClusterLoadAssignment.newBuilder().setClusterName("my-echo-server3-cluster").build()),
            ImmutableList.of(Listener.newBuilder().setName("my-echo-server").build(),
                    Listener.newBuilder().setName("my-echo-server2").build(),
                    Listener.newBuilder().setName("my-echo-server3").build()),
            ImmutableList.of(RouteConfiguration.newBuilder().setName("my-echo-server-route").build(),
                    RouteConfiguration.newBuilder().setName("my-echo-server2-route").build(),
                    RouteConfiguration.newBuilder().setName("my-echo-server3-route").build()),
            ImmutableList.<Secret>of(),
            VERSION1);

    @Test
    void test() throws IOException {
        var cache = new SimpleCache<>(new StaticNodeGroup());
        cache.setSnapshot("default-cluster", SNAPSHOT1);

        var discoveryServer = new V3DiscoveryServer(cache);

        final var serverName = InProcessServerBuilder.generateName();
        final var server = InProcessServerBuilder
                .forName(serverName).directExecutor().addService(discoveryServer.getAggregatedDiscoveryServiceImpl())
                .build();
        server.start();
        final var stub = AggregatedDiscoveryServiceGrpc
                .newStub(InProcessChannelBuilder.forName(serverName).directExecutor().build());

        var xdsConfig = new XdsConfig();

        final var requestObserver = stub.streamAggregatedResources(new ResponseStreamObserver(xdsConfig));

        // LDS->RDS->CDS->EDS
        // LDS
        requestObserver.onNext(drListener1);
        Assertions.assertEquals("0", xdsConfig.ldsNonce);
        Assertions.assertEquals(List.of("my-echo-server", "my-echo-server2"), xdsConfig.ldsResourcesNames);

        // RDS
        requestObserver.onNext(drRoute1);
        Assertions.assertEquals("1", xdsConfig.rdsNonce);
        Assertions.assertEquals(List.of("my-echo-server-route", "my-echo-server2-route"), xdsConfig.rdsResourcesNames);

        // CDS
        requestObserver.onNext(drCluster1);
        Assertions.assertEquals("2", xdsConfig.cdsNonce);
        Assertions.assertEquals(List.of("my-echo-server-cluster", "my-echo-server2-cluster"),
                xdsConfig.cdsResourcesNames);

        // EDS
        requestObserver.onNext(drEndpoint1);
        Assertions.assertEquals("3", xdsConfig.edsNonce);
        Assertions.assertEquals(List.of("my-echo-server-cluster", "my-echo-server2-cluster"),
                xdsConfig.edsResourcesNames);

        // ACK configuration
        final var drListener2 = DiscoveryRequest.newBuilder(drListener1).setResponseNonce(xdsConfig.ldsNonce)
                .setVersionInfo(xdsConfig.ldsVersionInfo).build();
        final var drRoute2 = DiscoveryRequest.newBuilder(drRoute1).setResponseNonce(xdsConfig.rdsNonce)
                .setVersionInfo(xdsConfig.rdsVersionInfo).build();
        final var drCluster2 = DiscoveryRequest.newBuilder(drCluster1).setResponseNonce(xdsConfig.cdsNonce)
                .setVersionInfo(xdsConfig.cdsVersionInfo).build();
        final var drEndpoint2 = DiscoveryRequest.newBuilder(drEndpoint1).setResponseNonce(xdsConfig.edsNonce)
                .setVersionInfo(xdsConfig.edsVersionInfo).build();
        requestObserver.onNext(drListener2);
        requestObserver.onNext(drRoute2);
        requestObserver.onNext(drCluster2);
        requestObserver.onNext(drEndpoint2);

        // verify that no other change was received
        Assertions.assertEquals("0", xdsConfig.ldsNonce);
        Assertions.assertEquals("1", xdsConfig.rdsNonce);
        Assertions.assertEquals("2", xdsConfig.cdsNonce);
        Assertions.assertEquals("3", xdsConfig.edsNonce);

        // unsubscribe one resource
        System.out.println("----------------------------------------------------------------------");
        System.out.println(" unsubscribe one resource ");
        System.out.println("----------------------------------------------------------------------");
        final var drListener3 = DiscoveryRequest.newBuilder(drListener2).clearResourceNames()
                .addResourceNames("my-echo-server").build();
        final var drRoute3 = DiscoveryRequest.newBuilder(drRoute2).clearResourceNames()
                .addResourceNames("my-echo-server-route").build();
        final var drCluster3 = DiscoveryRequest.newBuilder(drCluster2).clearResourceNames()
                .addResourceNames("my-echo-server-cluster").build();
        final var drEndpoint3 = DiscoveryRequest.newBuilder(drEndpoint2).clearResourceNames()
                .addResourceNames("my-echo-server-cluster").build();
        requestObserver.onNext(drListener3);
        requestObserver.onNext(drRoute3);
        requestObserver.onNext(drCluster3);
        requestObserver.onNext(drEndpoint3);

        // verify reply
        Assertions.assertEquals("0", xdsConfig.ldsNonce);
        Assertions.assertEquals("1", xdsConfig.rdsNonce);
        Assertions.assertEquals("2", xdsConfig.cdsNonce);
        Assertions.assertEquals("3", xdsConfig.edsNonce);

        // no change was send

        System.out.println("----------------------------------------------------------------------");
        System.out.println(" re-subscribe the resource my-echo-server2");
        System.out.println("----------------------------------------------------------------------");
        final var drListener4 = DiscoveryRequest.newBuilder(drListener3)
                .addResourceNames("my-echo-server2").build();
        requestObserver.onNext(drListener4);

        // verify reply
        Assertions.assertEquals("0", xdsConfig.ldsNonce);

        // no change was send!

        System.out.println("----------------------------------------------------------------------");
        System.out.println(" subscribe new resource my-echo-server3");
        System.out.println("----------------------------------------------------------------------");

        final var drListener5 = DiscoveryRequest.newBuilder(drListener3)
                .addResourceNames("my-echo-server3").build();
        requestObserver.onNext(drListener5);

        // verify reply
        Assertions.assertEquals("4", xdsConfig.ldsNonce);
        Assertions.assertEquals(List.of("my-echo-server", "my-echo-server3"),
                xdsConfig.ldsResourcesNames);

        // only "my-echo-server3" was send

        requestObserver.onCompleted();
        server.shutdown();
        try {
            server.awaitTermination();
        } catch (InterruptedException e) {
        }
        System.out.println("-- end --");
    }

    static class XdsConfig {
        // The flow used in gRPC is `LDS->RDS->CDS->EDS`

        String ldsVersionInfo;
        String ldsNonce;
        List<String> ldsResourcesNames = List.of();
        String rdsVersionInfo;
        String rdsNonce;
        List<String> rdsResourcesNames = List.of();
        String cdsVersionInfo;
        String cdsNonce;
        List<String> cdsResourcesNames = List.of();
        String edsVersionInfo;
        String edsNonce;
        List<String> edsResourcesNames = List.of();
    }

    static class ResponseStreamObserver implements StreamObserver<DiscoveryResponse> {
        XdsConfig xdsConfig;

        public ResponseStreamObserver(XdsConfig xdsConfig) {
            this.xdsConfig = xdsConfig;
        }

        @Override
        public void onNext(DiscoveryResponse value) {
            System.out.println(value.toString());
            if (value.getTypeUrl().equals(Resources.V3.LISTENER_TYPE_URL)) {
                xdsConfig.ldsVersionInfo = value.getVersionInfo();
                xdsConfig.ldsNonce = value.getNonce();
                var resourceNames = new ArrayList<String>();
                for (var any : value.getResourcesList()) {
                    try {
                        var listener = any.unpack(Listener.class);
                        resourceNames.add(listener.getName());
                    } catch (InvalidProtocolBufferException e) {
                        e.printStackTrace();
                    }
                }
                xdsConfig.ldsResourcesNames = resourceNames;
            } else if (value.getTypeUrl().equals(Resources.V3.ROUTE_TYPE_URL)) {
                xdsConfig.rdsVersionInfo = value.getVersionInfo();
                xdsConfig.rdsNonce = value.getNonce();
                var resourceNames = new ArrayList<String>();
                for (var any : value.getResourcesList()) {
                    try {
                        var route = any.unpack(RouteConfiguration.class);
                        resourceNames.add(route.getName());
                    } catch (InvalidProtocolBufferException e) {
                        e.printStackTrace();
                    }
                }
                xdsConfig.rdsResourcesNames = resourceNames;
            } else if (value.getTypeUrl().equals(Resources.V3.CLUSTER_TYPE_URL)) {
                xdsConfig.cdsVersionInfo = value.getVersionInfo();
                xdsConfig.cdsNonce = value.getNonce();
                var resourceNames = new ArrayList<String>();
                for (var any : value.getResourcesList()) {
                    try {
                        var cluster = any.unpack(Cluster.class);
                        resourceNames.add(cluster.getName());
                    } catch (InvalidProtocolBufferException e) {
                        e.printStackTrace();
                    }
                }
                xdsConfig.cdsResourcesNames = resourceNames;
            } else if (value.getTypeUrl().equals(Resources.V3.ENDPOINT_TYPE_URL)) {
                xdsConfig.edsVersionInfo = value.getVersionInfo();
                xdsConfig.edsNonce = value.getNonce();
                var resourceNames = new ArrayList<String>();
                for (var any : value.getResourcesList()) {
                    try {
                        var cla = any.unpack(ClusterLoadAssignment.class);
                        resourceNames.add(cla.getClusterName());
                    } catch (InvalidProtocolBufferException e) {
                        e.printStackTrace();
                    }
                }
                xdsConfig.edsResourcesNames = resourceNames;
            }
        }

        @Override
        public void onError(Throwable t) {
        }

        @Override
        public void onCompleted() {
        }
    }
}

debug.log.txt

fboranek commented 8 months ago

I solve my original problem with custom ResourceVersionResolver in Snapshot. Now when resource list is changed the version is changed to force discovery service send response.

private static ResourceVersionResolver createVersionResolver(String version)
    {
        return new ResourceVersionResolver() {
            @SuppressWarnings("null")
            @Override
            public String version(List<String> resourceNames) {
                if (resourceNames.isEmpty())
                    return version;
                var hasher = Hashing.murmur3_32_fixed().newHasher();
                resourceNames.stream().forEachOrdered((name) -> hasher.putBytes(name.getBytes()));
                return version + "/" + hasher.hash().toString();
            }
        };
    }
atollena commented 8 months ago

Probably related: https://github.com/grpc/grpc-go/issues/7013#issuecomment-1972805644

fboranek commented 8 months ago

Probably related: grpc/grpc-go#7013 (comment)

It's likely not. This case was there was still active connection so the resource list was not completely empty.

Whats happen was in app:

in log then i can see errors:

2024/02/13 10:56:26 E4: Problem calling: empty address list: server-2: xDS listener resource does not exist
2024/02/13 10:56:26 E4: Problem calling: empty address list: server-1: xDS listener resource does not exist

It doesn't seem as error in gRPC. Might error in control-plain library or just how I used it.

I already change it the way the gRPC like it, but still I don't know it is the right fix. Now the protocol-flow looks like:

versioned-snapshot drawio

fboranek commented 8 months ago

Probably related: grpc/grpc-go#7013 (comment)

It's likely not. This case was there was still active connection so the resource list was not completely empty.

@atollena On second thought. If you using this library, the way here as in a unit test, it is exactly the same problem.