kmesh-net / kmesh

High Performance ServiceMesh Data Plane Based on Programmable Kernel
https://kmesh.net
Apache License 2.0
362 stars 46 forks source link

Delta xds reconnect may lead to resource leak #415

Closed hzxuzhonghu closed 3 weeks ago

hzxuzhonghu commented 3 weeks ago

What happened:

Currently we are uing delta xds for workloads subscribe. But if a reconnect happens, we may miss knowing which resources are removed during the reconnect.

As delta xds documented https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol

On reconnect the Incremental xDS client may tell the server of its known resources 
to avoid resending them over the network by sending them in initial_resource_versions. 
Because no state is assumed to be preserved from the previous stream, the reconnecting client 
must provide the server with all resource names it is interested in.

What you expected to happen:

How to reproduce it (as minimally and precisely as possible):

Anything else we need to know?:

Environment:

hzxuzhonghu commented 3 weeks ago

@Okabe-Rintarou-0 Would you like to have a try

Okabe-Rintarou-0 commented 3 weeks ago

/assign

Okabe-Rintarou-0 commented 3 weeks ago

we can use the names param to tell istio that we have subscribed the corresponding resources before. https://github.com/kmesh-net/kmesh/blob/0656e264906ed86a769974cd6da4a3f52d1abe0c/pkg/controller/workload/workload_processor.go#L72-L80

Maybe we can fetch the address names subscribed before:

func (ws *Controller) WorkloadStreamCreateAndSend(client discoveryv3.AggregatedDiscoveryServiceClient, ctx context.Context) error {
    var (
        err error
        addressNames []string
        authorizationNames []string
    )

    ws.Stream, err = client.DeltaAggregatedResources(ctx)
    if err != nil {
        return fmt.Errorf("DeltaAggregatedResources failed, %s", err)
    }
        // fetch subscribed addresses
        addressNames = xxx

    if err := ws.Stream.Send(newWorkloadRequest(AddressType, addressNames)); err != nil {
        return fmt.Errorf("send request failed, %s", err)
    }

    if err = ws.Stream.Send(newWorkloadRequest(AuthorizationType, authorizationNames)); err != nil {
        return fmt.Errorf("authorization subscribe failed, %s", err)
    }

    return nil
}

Shall we fetch them from bpf map?

hzxuzhonghu commented 3 weeks ago

I think we can make it more simple by storing it in userspace cache. Actually we have storred workloads and services for confug_dump.

hzxuzhonghu commented 1 week ago

AuthorizationPolicy also has the problem, and better to add some tests