apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14k stars 3.54k forks source link

[Go, Functions] Panic while goInstance.ackInputMessage #15100

Open 0gap opened 2 years ago

0gap commented 2 years ago

Describe the bug I am testing a simple Go function with Pulsar in standalone in docker. When setting --auto-ack true i get a panic from pulsar-function-go/pf/instance.go in ackInputMessage. I build it in my system, docker cp it in the Pulsar container and run it using:

bin/pulsar-admin functions localrun --auto-ack true --go /pulsar/inputFunc --inputs persistent://zerogap/test-nam/in-msgs --output non-persistent://zerogap/test-nam/filtered-msgs --tenant zerogap --namespace test-nam --name test-input-go-function --retain-ordering --retain-key-ordering --user-config '{"custom-data":"12345"}'

The simple function's code is:

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "github.com/apache/pulsar/pulsar-function-go/pf"
)

func contextFunc(ctx context.Context) string {
    fc, ok := pf.FromContext(ctx)
    if !ok {
        fmt.Printf("Function context is not defined")
    }

    wotd := fc.GetUserConfValue("custom-data")

    if wotd == nil {
        fmt.Printf("custom-data is empty. set it with --user-config")
        return ""
    } else {
        fmt.Printf("custom-data is %s", wotd.(string))
        return wotd.(string)
    }
}

func FilterData(ctx context.Context, input []byte) ([]byte, error) {
    wotd := contextFunc(ctx)
    fmt.Printf("Input message in topic is: %s\n", input)
    fmt.Printf("Filtering radar for custom-data: %s\n", wotd)
    var dat map[string]interface{}
    if err := json.Unmarshal(input, &dat); err != nil {
        panic(err)
    }
    fmt.Println(dat)

    return input, nil
}

func main() {
    pf.Start(FilterData)
}

To Reproduce Steps to reproduce the behavior:

  1. Start Pulsar in standalone mode
  2. Create the topics the function will be using as input/output
  3. Add simple Go function
  4. Start the function with the command arguments as above(especially having --auto-ack set to true)
  5. See error below

Expected behavior Expect to have message acknowledgement without panic exit.

Screenshots Panic log:

panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x18 pc=0xaece73]

goroutine 74 [running]:
github.com/apache/pulsar/pulsar-function-go/pf.(*goInstance).ackInputMessage(0xc000214a00, {0xe75e60, 0xc0001fe2d0})
    /home/zerogap/go/pkg/mod/github.com/apache/pulsar/pulsar-function-go@v0.0.0-20220410070944-960580764bbc/pf/instance.go:396 +0xb3
github.com/apache/pulsar/pulsar-function-go/pf.(*goInstance).processResult.func1({0xd11780?, 0x3f52aa8fe9a01fce?}, 0x1?, {0x0?, 0x0?})
    /home/zerogap/go/pkg/mod/github.com/apache/pulsar/pulsar-function-go@v0.0.0-20220410070944-960580764bbc/pf/instance.go:378 +0x112
github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).ReceivedSendReceipt(0xc0000ac000, 0xc000230540)
    /home/zerogap/go/pkg/mod/github.com/apache/pulsar-client-go@v0.8.1/pulsar/producer_partition.go:830 +0x7f0
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).handleSendReceipt(0xc00017dce0, 0xc000230540)
    /home/zerogap/go/pkg/mod/github.com/apache/pulsar-client-go@v0.8.1/pulsar/internal/connection.go:673 +0xe8
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).internalReceivedCommand(0xc00017dce0, 0xc0006285a0, {0x0?, 0x0})
    /home/zerogap/go/pkg/mod/github.com/apache/pulsar-client-go@v0.8.1/pulsar/internal/connection.go:558 +0x14a
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run(0xc00017dce0)
    /home/zerogap/go/pkg/mod/github.com/apache/pulsar-client-go@v0.8.1/pulsar/internal/connection.go:415 +0x3a5
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
    /home/zerogap/go/pkg/mod/github.com/apache/pulsar-client-go@v0.8.1/pulsar/internal/connection.go:227 +0x65
created by github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start
    /home/zerogap/go/pkg/mod/github.com/apache/pulsar-client-go@v0.8.1/pulsar/internal/connection.go:223 +0x70

Desktop (please complete the following information):

liangyuanpeng commented 2 years ago

I have tested it and it's working for me.
pulsar version: 2.9.2

produce message:

bin/pulsar-client produce -m {\"hello\":\"world\"} persistent://zerogap/test-nam/in-msgs

function log:

2022/04/19 14:42:55.009 instance.go:431: [error] the logAppender is nil, if you want to use it, please specify `--log-topic` at startup.
custom-data is 12345Input message in topic is: {"hello":"world"}
Filtering radar for custom-data: 12345
map[hello:world]
0gap commented 2 years ago

I previously was running 2.9.1(latest pulled it).

Just tried again with 2.9.2. Same thing happens.

One note here, i am running pulsar through docker-compose:


  pulsar:
    image: apachepulsar/pulsar:2.9.2
    volumes:
      - pulsar_data:/pulsar/data
      - pulsar_config:/pulsar/conf
    ports:
      - "6650:6650"
      - "8080:8080"
    container_name: pulsar-broker
    hostname: pulsar-broker
    command: bin/pulsar standalone -nfw -nss
    networks:
      - pulsar

I am starting to wonder whether my dependencies are a problem?!

0gap commented 2 years ago

Just to clarify a bit. I am seeing some of the messages when starting the function, but after a few of them, I end up with the SIGSEGV:

Filtering radar for custom-data: 12345
map[count:1 msg:test msg: 18:57:28 test_id:3 producerID:Producer 1] 
custom-data is 12345Input message in topic is: {"test_id":2,"msg":"test msg: 16:10:13", "count":1, "producerID":"Producer 2"}

Filtering radar for custom-data: 12345
map[count:1 msg:test msg: 16:10:13 test_id:2 producerID:Producer 2] 
custom-data is 12345Input message in topic is: {"test_id":2,"msg":"test msg: 16:10:13", "count":2, "producerID":"Producer 2"}

Filtering radar for custom-data: 12345
map[count:2 msg:test msg: 16:10:13 test_id:2 producerID:Producer 2] 
custom-data is 12345Input message in topic is: {"test_id":3,"msg":"test msg: 16:30:39", "count":1, "producerID":"Producer 1"}

Filtering radar for custom-data: 12345
map[count:1 msg:test msg: 16:30:39 test_id:3 producerID:Producer 1] 
2022/04/19 16:15:02.729 log.go:46: [info] Starting metrics server on port 38229
2022/04/19 16:15:02.737 once.go:68: [warning] [Connection was closed] producer_name=standalone-3-89 producerID=2 cnx=127.0.0.1:58704 -> 127.0.0.1:6650 topic=non-persistent://zerogap/test-nam/filtered-msgs-partition-1
2022/04/19 16:15:02.737 once.go:68: [warning] [Connection was closed] topic=non-persistent://zerogap/test-nam/filtered-msgs-partition-0 producer_name=standalone-3-88 producerID=1 cnx=127.0.0.1:58704 -> 127.0.0.1:6650
2022/04/19 16:15:02.737 once.go:68: [warning] [Connection was closed] topic=persistent://public/default/func-logs producerID=3 producer_name=standalone-3-90 cnx=127.0.0.1:58704 -> 127.0.0.1:6650
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x18 pc=0xaece73]

goroutine 91 [running]:
github.com/apache/pulsar/pulsar-function-go/pf.(*goInstance).ackInputMessage(0xc0001fca00, {0xe75e80, 0xc0001ec1e0})
        /home/zerogap/go/pkg/mod/github.com/apache/pulsar/pulsar-function-go@v0.0.0-20220410070944-960580764bbc/pf/instance.go:396 +0xb3
github.com/apache/pulsar/pulsar-function-go/pf.(*goInstance).processResult.func1({0xd11780?, 0x3f814d471f6813ff?}, 0x1?, {0x0?, 0x0?})
        /home/zerogap/go/pkg/mod/github.com/apache/pulsar/pulsar-function-go@v0.0.0-20220410070944-960580764bbc/pf/instance.go:378 +0x112
github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).ReceivedSendReceipt(0xc000160000, 0xc000225740)
        /home/zerogap/go/pkg/mod/github.com/apache/pulsar-client-go@v0.8.1/pulsar/producer_partition.go:830 +0x7f0
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).handleSendReceipt(0xc0000f9ce0, 0xc000225740)
        /home/zerogap/go/pkg/mod/github.com/apache/pulsar-client-go@v0.8.1/pulsar/internal/connection.go:673 +0xe8
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).internalReceivedCommand(0xc0000f9ce0, 0xc000311e00, {0x0?, 0x0})
        /home/zerogap/go/pkg/mod/github.com/apache/pulsar-client-go@v0.8.1/pulsar/internal/connection.go:558 +0x14a
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run(0xc0000f9ce0)
        /home/zerogap/go/pkg/mod/github.com/apache/pulsar-client-go@v0.8.1/pulsar/internal/connection.go:415 +0x3a5
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
        /home/zerogap/go/pkg/mod/github.com/apache/pulsar-client-go@v0.8.1/pulsar/internal/connection.go:227 +0x65
created by github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start
        /home/zerogap/go/pkg/mod/github.com/apache/pulsar-client-go@v0.8.1/pulsar/internal/connection.go:223 +0x70
github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.

liangyuanpeng commented 2 years ago

This problem have happen in my case, I begin check it again. pulsar version 2.10.0

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.