streamnative / function-mesh

The serverless framework purpose-built for event streaming applications.
https://functionmesh.io/
Apache License 2.0
210 stars 27 forks source link

[issue] In effectively_once mode, when a single pod fails, the entire function will fail due to failure to create the producer. #711

Open graysonzeng opened 8 months ago

graysonzeng commented 8 months ago
  1. When I enable effectively_once and deploy the function in k8s, for example, after deploying 5 pods, when one of my pods crashes, his subscription will be transferred to other pods due to failover mode. At this time, other pods will fail to create the producer because the producer on the server side is not closed.
Failed to create producer: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'persistent://pulsar/default/input_test-partition-5-0' is already connected to topic","reqId":1766584259806202457, "remote":"21.21.47.12/21.21.47.12:6650", "local":"/9.165.174.197:46786"}

After this, the function restarts due to an exception, and due to failover, the function once again transfers the subscription and fails due to failure to create the producer. Causes the all function pods to constantly restart Therefore, when I need to enable effectively_once, I have to deploy multiple functions to consume partitioned topics separately. But this is not an easy way to maintain

  1. In addition, the function can easily fall into the following error and be stuck because of this error until the broker is restarted.
WARN  org.apache.pulsar.client.impl.ClientCnx - [id: 0x24fe09d6, L:/9.165.182.50:36944 ! R:21.21.134.241/21.21.134.241:6650] Received send error from server: PersistenceError : Cannot determine whether the message is a duplicate at this time

If we have any optimization suggestions, I hope can provide them, thank very much

graysonzeng commented 8 months ago

@freeznet @nlu90 @jiangpengcheng PTAL

jiangpengcheng commented 8 months ago

it looks like that the server should close the producer when the related client is crashed and doesn't close the producer

which version of Pulsar are you using? @graysonzeng

graysonzeng commented 8 months ago

it looks like that the server should close the producer when the related client is crashed and doesn't close the producer

which version of Pulsar are you using? @graysonzeng

Thanks for your reply, the pulsar version is 3.1.1。I agree with you. It seems that some mechanism is needed to properly shut down the producer before creating it.

jiangpengcheng commented 8 months ago

I will check it

jiangpengcheng commented 7 months ago

I can't reproduce the error, when I manually delete one function pod, its producer for output topic will be closed by the server too, so the next active consumer can create the producer

could u share the yaml you used? @graysonzeng

graysonzeng commented 7 months ago

I can't reproduce the error, when I manually delete one function pod, its producer for output topic will be closed by the server too, so the next active consumer can create the producer

could u share the yaml you used? @graysonzeng

Of course @jiangpengcheng

apiVersion: compute.functionmesh.io/v1alpha1
kind: FunctionMesh
metadata:
  name: functionmesh-001
spec:
  functions:
    - name: functions-dedup-v1
      className: com.tencent.functionNoTag
      image: mirrors.tencent.com/g_k_cdp/pulsar-functions-test:v1.0.1
      replicas: 10
      processingGuarantee: "effectively_once" #effectively_once manual
      pod:
        terminationGracePeriodSeconds: 30
      input:
        topics:
          - persistent://pulsar/default2/input_test
        typeClassName: "[B"
      output:
        topic: persistent://pulsar/default2/alltables3
        typeClassName: "[B"
      pulsar:
        pulsarConfig: "pulsar-dedup-gtmz-167-sink-test02-config-v1"
        authSecret: "sink-dedup-test02-config-auth"
      java:
        extraDependenciesDir: ""
        jar: /pulsar/connectors//DynamicTopic-1.0.nar # the NAR location in image.
        jarLocation: "" # leave empty since we will not download package from Pulsar Packages
      clusterName: pulsar-gtmz-167
      forwardSourceMessageProperty: true
      resources:
        requests:
          cpu: "2"
          memory: 2G
        limits:
          cpu: "2"
          memory: 2G
      clusterName: test-pulsar
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: pulsar-dedup-gtmz-167-sink-test02-config-v1
data:
  webServiceURL: http://xx.xx.47.12:8080
  brokerServiceURL: pulsar://xx.xx.47.12:6650
  authPlugin: "org.apache.pulsar.client.impl.auth.AuthenticationToken"
  authParams: "eyJhxxxx"
---
apiVersion: v1
kind: Secret
metadata:
  name: sink-dedup-test02-config-auth
stringData:
  clientAuthenticationPlugin: "org.apache.pulsar.client.impl.auth.AuthenticationToken"
  clientAuthenticationParameters: "token:eyxxxx"
jiangpengcheng commented 7 months ago

I see, the error is caused by below:

when pod-1 crashes, another pod(like pod-2) will take over pod-1's subscription to pod-1 subscripted partition(like partition-5), if a message comes to partition-5, the pod-2 will create a new producer with the name persistent://pulsar/default/input_test-partition-5-0, then when pod-1 restarted and got a new message, it cannot create new producer with name persistent://pulsar/default/input_test-partition-5-0

this issue is caused by JavaInstanceRunnable, I will fix it in the apache/pulsar repo

@graysonzeng

graysonzeng commented 7 months ago

I see, the error is caused by below:

when pod-1 crashes, another pod(like pod-2) will take over pod-1's subscription to pod-1 subscripted partition(like partition-5), if a message comes to partition-5, the pod-2 will create a new producer with the name persistent://pulsar/default/input_test-partition-5-0, then when pod-1 restarted and got a new message, it cannot create new producer with name persistent://pulsar/default/input_test-partition-5-0

this issue is caused by JavaInstanceRunnable, I will fix it in the apache/pulsar repo

@graysonzeng

thanks!

jiangpengcheng commented 7 months ago

I created a pr here: https://github.com/apache/pulsar/pull/21912, and built a jar based on it, could you add it to your runner image to check whether the issue is resolved? @graysonzeng

jiangpengcheng commented 7 months ago

This issue is hard to fix since we cannot use different producer name to ensure the de-duplication. Below is a workaround when the error happens:

  1. set the function's replicas to 0
  2. wait until all function pods stop so that no producer exists for the output topic
  3. set back the function's replicas

cc @graysonzeng

graysonzeng commented 7 months ago

Thanks for the suggestion. I have an idea, how about using independent subscription names for each pod. For example, I have 10 partitions on the consumer side and two pods for consumption. Then, my two pods set subscription names A and B respectively. After this, even if the pod fails, the subscription will not be switched to other surviving pods due to failover, and it can continue to consume when the pod is pulled up by kubernetes. @jiangpengcheng

jiangpengcheng commented 7 months ago

with different sub names will make podA and podB both consume all ten partitions and lead to duplication

graysonzeng commented 7 months ago

A and B consume part of the partitions respectively. For example, A consumes partitions 0-4 and B consumes partitions 5-9. This will not lead to duplication.

jiangpengcheng commented 7 months ago

A and B consume part of the partitions respectively.

this needs to be specified manually, which is just like what you do now:

Therefore, when I need to enable effectively_once, I have to deploy multiple functions to consume partitioned topics separately. But this is not an easy way to maintain

graysonzeng commented 7 months ago

Thanks . I originally thought that the creation of subscription does not need to be specified manually. In this way, the user can create a sink with mutil-replicas. No need to configure multiple configurations in yaml repeatedly. But it seems like this is the only way it can be for now @jiangpengcheng