opensergo / opensergo-control-plane

Universal cloud-native microservice governance control plane (微服务治理控制面)
Apache License 2.0
35 stars 23 forks source link

rpc error: code = Internal desc = transport: SendHeader called multiple times #51

Open sczyh30 opened 1 year ago

sczyh30 commented 1 year ago

Issue Description

Type: bug report

Describe what happened

Error occurred when CRD changes received:

{"timestamp":"2023-01-17 19:44:31.31170","caller":"crd_watcher.go:171","logLevel":"INFO","msg":"controller.fault-tolerance.opensergo.io/v1alpha1/ConcurrencyLimitStrategy OpenSergo CRD received","crd":{"kind":"ConcurrencyLimitStrategy","apiVersion":"fault-tolerance.opensergo.io/v1alpha1","metadata":{"name":"concurrency-limit-foo","namespace":"default","uid":"0b79c7c4-7f6a-4677-a636-2210a9608a13","resourceVersion":"7910317","generation":1,"creationTimestamp":"2023-01-17T11:29:55Z","labels":{"app":"foo-app"},"annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"fault-tolerance.opensergo.io/v1alpha1\",\"kind\":\"ConcurrencyLimitStrategy\",\"metadata\":{\"annotations\":{},\"labels\":{\"app\":\"foo-app\"},\"name\":\"concurrency-limit-foo\",\"namespace\":\"default\"},\"spec\":{\"limitMode\":\"Local\",\"maxConcurrency\":8}}\n"},"managedFields":[{"manager":"kubectl-client-side-apply","operation":"Update","apiVersion":"fault-tolerance.opensergo.io/v1alpha1","time":"2023-01-17T11:29:55Z","fieldsType":"FieldsV1","fieldsV1":{"f:metadata":{"f:annotations":{".":{},"f:kubectl.kubernetes.io/last-applied-configuration":{}},"f:labels":{".":{},"f:app":{}}},"f:spec":{".":{},"f:limitMode":{},"f:maxConcurrency":{}}}}]},"spec":{"maxConcurrency":8,"limitMode":"Local"},"status":{}},"crdNamespace":"default","crdName":"concurrency-limit-foo","kind":"fault-tolerance.opensergo.io/v1alpha1/ConcurrencyLimitStrategy"}
{"timestamp":"2023-01-17 19:44:31.31170","caller":"crd_watcher.go:197","logLevel":"ERROR","msg":"controller.fault-tolerance.opensergo.io/v1alpha1/RateLimitStrategy Failed to send rules","kind":"fault-tolerance.opensergo.io/v1alpha1/RateLimitStrategy","crdNamespace":"default","crdName":"rate-limit-foo","kind":"fault-tolerance.opensergo.io/v1alpha1/RateLimitStrategy"}
rpc error: code = Internal desc = transport: SendHeader called multiple times
{"timestamp":"2023-01-17 19:44:31.31170","caller":"crd_watcher.go:197","logLevel":"ERROR","msg":"controller.fault-tolerance.opensergo.io/v1alpha1/FaultToleranceRule Failed to send rules","kind":"fault-tolerance.opensergo.io/v1alpha1/FaultToleranceRule","crdNamespace":"default","crdName":"my-opensergo-rule-2","kind":"fault-tolerance.opensergo.io/v1alpha1/FaultToleranceRule"}
rpc error: code = Internal desc = transport: SendHeader called multiple times

Tell us your environment

OpenSergo control plane v0.1.0

Anything else we need to know?

NONE

sczyh30 commented 1 year ago

Client-side error (Java SDK):

2023-01-18 09:52:07.674 ERROR 13375 --- [ault-executor-1] openSergoLogger                          : Fatal error occurred on OpenSergo gRPC ClientObserver

io.grpc.StatusRuntimeException: INTERNAL: transport: SendHeader called multiple times
    at io.grpc.Status.asRuntimeException(Status.java:535) ~[grpc-api-1.42.1.jar:1.42.1]
    at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:479) [grpc-stub-1.42.1.jar:1.42.1]
    at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:463) [grpc-core-1.42.1.jar:1.42.1]
    at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:427) [grpc-core-1.42.1.jar:1.42.1]
    at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:460) [grpc-core-1.42.1.jar:1.42.1]
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:562) [grpc-core-1.42.1.jar:1.42.1]
    at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70) [grpc-core-1.42.1.jar:1.42.1]
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:743) [grpc-core-1.42.1.jar:1.42.1]
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:722) [grpc-core-1.42.1.jar:1.42.1]
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) [grpc-core-1.42.1.jar:1.42.1]
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) [grpc-core-1.42.1.jar:1.42.1]
anushkabishnoi commented 1 year ago

Hello, this is my first experience with open source contributions. Would really appreciate it if you explained exactly what is needed to be done.

I would love to work on this issue.

Thanks in advance! :)

mayooot commented 5 months ago

@sczyh30 This seems to be caused by too fast a send rate. I think it can be fixed by adding a send interval or sync.Mutex

control_plane.go:96

const sendInterval = 100 * time.Millisecond

func (c *ControlPlane) sendMessageToStream(stream model.OpenSergoTransportStream, namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string) error {
    if stream == nil {
        return nil
    }
    err := stream.SendMsg(&trpb.SubscribeResponse{
        Status:          status,
        Ack:             "",
        Namespace:       namespace,
        App:             app,
        Kind:            kind,
        DataWithVersion: dataWithVersion,
        ControlPlane:    c.protoDesc,
        ResponseId:      respId,
    })
    if err != nil {
        return err
    }
    time.Sleep(sendInterval)
    return nil
}

or

func (c *ControlPlane) sendMessageToStream(stream model.OpenSergoTransportStream, namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string) error {
    if stream == nil {
        return nil
    }

    c.mux.Lock()
    defer c.mux.Unlock()

    return stream.SendMsg(&trpb.SubscribeResponse{
        Status:          status,
        Ack:             "",
        Namespace:       namespace,
        App:             app,
        Kind:            kind,
        DataWithVersion: dataWithVersion,
        ControlPlane:    c.protoDesc,
        ResponseId:      respId,
    })
}

reference: