streamnative / function-mesh

The serverless framework purpose-built for event streaming applications.
https://functionmesh.io/
Apache License 2.0
213 stars 29 forks source link

Unable to change subscription type for sink connectors in current Sink CRD. #535

Closed irhawks closed 1 year ago

irhawks commented 1 year ago

For example, when I submit a pulsar-io-lakehouse Sink to function mesh, an errors occurs:

java.lang.IllegalArgumentException: Lakehouse sink connector only support accumulative acknowledge, so only support Failover or Exclusive subscription type.
    at org.apache.pulsar.ecosystem.io.lakehouse.SinkConnector.open(SinkConnector.java:59) ~[meYWrWCshDf3EnrjDgNa4A/:?]
    at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupOutput(JavaInstanceRunnable.java:835) ~[?:?]
    at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setup(JavaInstanceRunnable.java:230) ~[?:?]
    at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:260) ~[?:?]
    at java.lang.Thread.run(Thread.java:829) ~[?:?]
2022-11-23T11:59:19,941+0000 [public/websites/warc-avro-delta-sink-0] ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable - [public/websites/warc-avro-delta-sink:0] Uncaught exception in Java Instance
java.lang.IllegalArgumentException: Lakehouse sink connector only support accumulative acknowledge, so only support Failover or Exclusive subscription type.
    at org.apache.pulsar.ecosystem.io.lakehouse.SinkConnector.open(SinkConnector.java:59) ~[?:?]
    at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupOutput(JavaInstanceRunnable.java:835) ~[io.streamnative-pulsar-functions-instance-2.9.3.14.jar:2.9.3.14]
    at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setup(JavaInstanceRunnable.java:230) ~[io.streamnative-pulsar-functions-instance-2.9.3.14.jar:2.9.3.14]
    at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:260) ~[io.streamnative-pulsar-functions-instance-2.9.3.14.jar:2.9.3.14]
    at java.lang.Thread.run(Thread.java:829) ~[?:?]
2022-11-23T11:59:20,039+0000 [public/websites/warc-avro-delta-sink-0] INFO  org.apache.pulsar.functions.instance.JavaInstanceRunnable - Closing instance
2022-11-23T11:59:20,040+0000 [public/websites/warc-avro-delta-sink-0] ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable - Failed to close sink 
java.lang.NullPointerException: null
    at org.apache.pulsar.ecosystem.io.lakehouse.SinkConnector.close(SinkConnector.java:99) ~[meYWrWCshDf3EnrjDgNa4A/:?]
    at org.apache.pulsar.functions.instance.JavaInstanceRunnable.close(JavaInstanceRunnable.java:451) ~[?:?]
    at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:319) ~[?:?]
    at java.lang.Thread.run(Thread.java:829) ~[?:?]
2022-11-23T11:59:20,041+0000 [main] INFO  org.apache.pulsar.functions.runtime.JavaInstanceStarter - RuntimeSpawner quit, shutting down JavaInstance
2022-11-23T11:59:20,238+0000 [main] INFO  org.apache.pulsar.functions.runtime.thread.ThreadRuntime - Unloading JAR files for function InstanceConfig(instanceId=0, functionId=0-60d5f557-ff2e-42a2-bfe8-05fd6d4d1c76, functionVersion=0, functionDetails=tenant: "public"

My lakehouse connector is built from

FROM streamnative/sn-platform:2.9.3.14
RUN apt update && apt install -yq wget vim
RUN cd /root && wget -c https://github.com/streamnative/pulsar-io-lakehouse/releases/download/v2.9.3.14/pulsar-io-lakehouse-2.9.3.14-cloud.nar
RUN cd /root && cp pulsar-io-lakehouse-2.9.3.14-cloud.nar /pulsar/connectors/

RUN mkdir -p /pulsar/conf/java-log && chmod -R 777 /pulsar/conf
# docker build -t irhawks/pulsar-io-lakehouse:2.9.3.14 .

and built with

docker build -t irhawks/pulsar-io-lakehouse:2.9.3.14 .

and Sink componented is defined as

apiVersion: compute.functionmesh.io/v1alpha1
kind: Sink
metadata:
  name: test-delta-sink
spec:
  image: irhawks/pulsar-io-lakehouse:2.9.3.14 # using connector image here
  className: org.apache.pulsar.ecosystem.io.lakehouse.SinkConnector
  tenant: public
  namespace: websites
  replicas: 1
  maxReplicas: 1
  deadLetterTopic: "persistent://public/default/test-deadletter"
  processingGuarantee: 'atleast_once'
  subscriptionName: "test-sink"
  subscriptionType: "FAILOVER"
  input:
    topics:
    - persistent://public/default/test
    typeClassName: "[B"
  sinkConfig:
    provider: "aws-s3"
    accessKeyId: "admin"
    secretAccessKey: "admin123"
    role: "none"
    roleSessionName: "none"
    bucket: "websites"
    region: "us-east-1"
    endpoint: "http://minio.default.svc.cluster.local:9000"
    pathPrefix: 'datalake/pulsar/test/'
    formatType: "parquet"
    partitionerType: "time"
    timePartitionPattern: "yyyy-MM-dd"
    timePartitionDuration: "1d"
    batchSize: 100
    batchTimeMs: 10000
  pulsar:
    pulsarConfig: "fm-pulsar0"
  resources:
    requests:
      cpu: "0.1"
      memory: 1G
    limits:
      cpu: "0.2"
      memory: 1.1G

  java:
    extraDependenciesDir: random-dir/
    jar: 'connectors/pulsar-io-lakehouse-2.9.3.14-cloud.nar' # the NAR location in image
    jarLocation: ""
  clusterName: fm-pulsar0
  autoAck: true
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: fm-pulsar0
data:
    webServiceURL: http://pulsar0-broker.default.svc.cluster.local:8080
    brokerServiceURL: pulsar://pulsar0-broker.default.svc.cluster.local:6650

eveything goes well except that when submitted to the kubernetes cluster, the Lakehouse sink connector only support accumulative acknowledge. However I could not find any method in current version to support set subscriptionType attribute on a Sink CRD.

pulsar version: 2.9.3.14, kuberntes version: 1.25

tpiperatgod commented 1 year ago

according to this: https://github.com/streamnative/pulsar/blob/master/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java#L177-L184, what you need to do is to set spec.processingGuarantee to effectively_once