childe / gohangout

使用 golang 模仿的 Logstash。用于消费 Kafka 数据,处理后写入 ES、Clickhouse 等。
MIT License
1.01k stars 234 forks source link

编写插件加载失败 ould not open /zcola/pulsar_input.so: plugin: not implemented #235

Closed zcola closed 8 months ago

zcola commented 9 months ago

依葫芦画瓢,编译过了,但是加载不了,不知道错在哪里

package main

import (

        "github.com/childe/gohangout/codec"
        "github.com/golang/glog"
        "github.com/apache/pulsar-client-go/pulsar"
    "context"
)

type PulsarInput struct{
        config         map[interface{}]interface{}
        decoder codec.Decoder
    messages chan *pulsar.Message
    consumer pulsar.Consumer
    client   pulsar.Client

}

func New(config map[interface{}]interface{}) interface{} {
    var (
                codertype      string = "plain"
    )
    if codecV, ok := config["codec"]; ok {
                codertype = codecV.(string)
        }
    PulsarInput := &PulsarInput{
                config:         config,
                messages:       make(chan *pulsar.Message, 10),
                decoder:        codec.NewDecoder(codertype),
        }

    client, err := pulsar.NewClient(pulsar.ClientOptions{URL: config["serviceUrl"].(string)})
    // defer client.Close()
    c, err := client.Subscribe(pulsar.ConsumerOptions{
        Topic:            config["Topic"].(string),
        SubscriptionName: config["subscriptionName"].(string),
        Type:             pulsar.Shared,
    })
    if err != nil {
        glog.Fatalf("could not init Consumer: %s", err)
    }
    // defer c.Close()
    go func() {
        for {
            msg, err := c.Receive(context.Background())
            if err == nil {
                PulsarInput.messages <- &msg
            } else {
                // The client will automatically try to recover from all errors.
                glog.Errorf("Consumer error: %v (%v)\n", err, msg)
            }
        }
    }()
        return PulsarInput
}

func (p *PulsarInput) ReadOneEvent() map[string]interface{} {
    message := <-p.messages
    event := p.decoder.Decode((*message).Payload())
    return event
}

func (p *PulsarInput) Shutdown() {
    p.consumer.Close()
    p.client.Close()
}
zcola commented 9 months ago

配置

inputs:
    - '/zcola/pulsar_input.so':
        serviceUrl: "pulsar://changan.bdsds.com:6650"
        topic: "persistent://loghub/elk_zone_1/cdsdsnlog"
        subscriptionName: "zcola"

outputs:
    - stdin: {}
childe commented 9 months ago

看着是因为主程序编译的时候没有打开cgo enable选项。

发自我的iPhone

------------------ 原始邮件 ------------------ 发件人: ZhangRui @.> 发送时间: 2023年11月15日 17:56 收件人: childe/gohangout @.> 抄送: Subscribed @.***> 主题: Re: [childe/gohangout] 编写插件加载失败 ould not open /zcola/pulsar_input.so: plugin: not implemented (Issue #235)

配置 inputs: - '/zcola/pulsar_input.so': serviceUrl: "pulsar://changan.bdsds.com:6650" topic: "persistent://loghub/elk_zone_1/cdsdsnlog" subscriptionName: "zcola" outputs: - stdin: {}
— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you are subscribed to this thread.Message ID: @.***>

zcola commented 9 months ago

打开后加载报错,看不懂,我这个也不依赖ck fatal error: runtime: no plugin module data

I1116 07:38:34.999698   26885 gohangout.go:155] gohangout version: b5ba135
I1116 07:38:35.018396   26885 gohangout.go:99] input[1] map[/home/zcola/gohangout_docker/pulsar_input.so:map[serviceUrl:pulsar://changan.bdsds.com:6650 subscriptionName:zcola topic:persistent://loghub/elk_zone_1/cdsdsnlog]]
I1116 07:38:35.018461   26885 input.go:29] could not load /home/zcola/gohangout_docker/pulsar_input.so input plugin, try third party plugin
fatal error: runtime: no plugin module data

goroutine 1 [running]:
runtime.throw({0xfbd064?, 0xc0000021a0?})
        /usr/local/go/src/runtime/panic.go:992 +0x71 fp=0xc000199860 sp=0xc000199830 pc=0x74d1d1
plugin.lastmoduleinit()
        /usr/local/go/src/runtime/plugin.go:20 +0x8d0 fp=0xc000199948 sp=0xc000199860 pc=0x77c6b0
plugin.open({0xc00003ecf0, 0x2c})
        /usr/local/go/src/plugin/plugin_dlopen.go:77 +0x3ea fp=0xc000199bb8 sp=0xc000199948 pc=0xa9e26a
plugin.Open(...)
        /usr/local/go/src/plugin/plugin.go:32
github.com/childe/gohangout/input.getInputFromPlugin({0xc00003ecf0, 0x2c}, 0xfd1127?)
        /gohangout/input/input.go:41 +0x45 fp=0xc000199c80 sp=0xc000199bb8 pc=0xe03365
github.com/childe/gohangout/input.GetInput({0xc00003ecf0, 0x2c}, 0xfabc6d?)
        /gohangout/input/input.go:32 +0xd8 fp=0xc000199d10 sp=0xc000199c80 pc=0xe03218
main.buildPluginLink(0x7ffef22a9cf6?)
        /gohangout/gohangout.go:106 +0x2ab fp=0xc000199e58 sp=0xc000199d10 pc=0xe2038b
main.main()
        /gohangout/gohangout.go:199 +0x305 fp=0xc000199f80 sp=0xc000199e58 pc=0xe20ec5
runtime.main()
        /usr/local/go/src/runtime/proc.go:250 +0x212 fp=0xc000199fe0 sp=0xc000199f80 pc=0x74fc12
runtime.goexit()
        /usr/local/go/src/runtime/asm_amd64.s:1571 +0x1 fp=0xc000199fe8 sp=0xc000199fe0 pc=0x781541

goroutine 6 [chan receive]:
github.com/golang/glog.(*loggingT).flushDaemon(0x0?)
        /go/pkg/mod/github.com/golang/glog@v1.0.0/glog.go:882 +0x6a
created by github.com/golang/glog.init.0
        /go/pkg/mod/github.com/golang/glog@v1.0.0/glog.go:410 +0x1bf

goroutine 9 [chan receive]:
github.com/ClickHouse/clickhouse-go.init.0.func1()
        /go/pkg/mod/github.com/!click!house/clickhouse-go@v1.5.4/bootstrap.go:48 +0x2d
created by github.com/ClickHouse/clickhouse-go.init.0
        /go/pkg/mod/github.com/!click!house/clickhouse-go@v1.5.4/bootstrap.go:45 +0x3f
zcola commented 9 months ago

忽略上一条,插件和主程序 go 版本不一致,重新编译后另外一个报错

I1116 07:44:59.904896    3886 input.go:29] could not load /home/zcola/gohangout_docker/pulsar_input.so input plugin, try third party plugin
E1116 07:45:00.503104    3886 input.go:34] could not load /home/zcola/gohangout_docker/pulsar_input.so: could not open /home/zcola/gohangout_docker/pulsar_input.so: plugin.Open("/home/zcola/gohangout_docker/pulsar_input"): plugin was built with a different version of package github.com/childe/gohangout/simplejson
F1116 07:45:00.503134    3886 gohangout.go:201] build plugin link error: invalid input plugin
childe commented 9 months ago

E1116 07:45:00.503104 3886 input.go:34] could not load /home/zcola/gohangout_docker/pulsar_input.so: could not open /home/zcola/gohangout_docker/pulsar_input.so: plugin.Open("/home/zcola/gohangout_docker/pulsar_input"): plugin was built with a different version of package github.com/childe/gohangout/simplejson

built with a different version of package github.com/childe/gohangout/simplejson 看着是这个原因? 不过 Go 的 Plugin 对版本的限制真让人头疼。

zcola commented 9 months ago

E1116 07:45:00.503104 3886 input.go:34] could not load /home/zcola/gohangout_docker/pulsar_input.so: could not open /home/zcola/gohangout_docker/pulsar_input.so: plugin.Open("/home/zcola/gohangout_docker/pulsar_input"): plugin was built with a different version of package github.com/childe/gohangout/simplejson

built with a different version of package github.com/childe/gohangout/simplejson 看着是这个原因? 不过 Go 的 Plugin 对版本的限制真让人头疼。

放到gohangout 目录下编译了,能帮我看看还有什么坑吗 https://github.com/zcola/gohangout-input-pulsar ,用是能用了,目前发现问题就是进程ctrl+c 没有优雅退出,pulsar 客户端没正常关闭?

I1127 16:07:23.075001    2350 bulk_http.go:163] try to bulk with host (http://elk-sdasdom:9200)
^CI1127 16:07:26.215648    2350 signalhandle_unix.go:19] capture signal: interrupt
I1127 16:07:26.215740    2350 input_box.go:144] try to shutdown input *main.PulsarInput
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x48 pc=0x7f8389f1d681]

goroutine 1 [running]:
plugin/unnamed-db27bc4e58e857f0c0d5706252354fa3a9d4aecb.(*PulsarInput).Shutdown(0xc000346c00)
        /mnt/c/work/gohangout/pulsar_input.go:90 +0x21
github.com/childe/gohangout/input.(*InputBox).shutdown.func1()
        /mnt/c/work/gohangout/input/input_box.go:145 +0x8f
sync.(*Once).doSlow(0xc00008a650, 0xc0000002e8)
        /usr/local/go/src/sync/once.go:68 +0xd2
sync.(*Once).Do(...)
        /usr/local/go/src/sync/once.go:59
github.com/childe/gohangout/input.(*InputBox).shutdown(0xc000726fc0)
        /mnt/c/work/gohangout/input/input_box.go:142 +0x4a
github.com/childe/gohangout/input.(*InputBox).Shutdown(...)
        /mnt/c/work/gohangout/input/input_box.go:160
main.gohangoutInputs.stop({0xc0005c0150, 0x1, 0x11c02e2})
        /mnt/c/work/gohangout/gohangout.go:70 +0x39
main.main()
        /mnt/c/work/gohangout/gohangout.go:215 +0x4b1
childe commented 9 months ago

第90行,p.consumer 从来没有赋值过,所以调用到它的时候 Panic

zcola commented 9 months ago
E1128 15:49:35.508027    1124 pulsar_input.go:70] Consumer error: consumer closed: ConsumerClosed (<nil>)
E1128 15:49:35.508073    1124 pulsar_input.go:70] Consumer error: consumer closed: ConsumerClosed (<nil>)
E1128 15:49:35.508092    1124 pulsar_input.go:70] Consumer error: consumer closed: ConsumerClosed (<nil>)
E1128 15:49:35.513387    1124 pulsar_input.go:70] Consumer error: consumer closed: ConsumerClosed (<nil>)
E1128 15:49:35.513418    1124 pulsar_input.go:70] Consumer error: consumer closed: ConsumerClosed (<nil>)
E1128 15:49:35.513423    1124 pulsar_input.go:70] Consumer error: consumer closed: ConsumerClosed (<nil>)
E1128 15:49:35.513441    1124 pulsar_input.go:70] Consumer error: consumer closed: ConsumerClosed (<nil>)
E1128 15:49:35.513452    1124 pulsar_input.go:70] Consumer error: consumer closed: ConsumerClosed (<nil>)
I1128 15:49:35.513215    1124 bulk_http.go:170] bulk done with execution_id 5 1.332 12526 9403.904
I1128 15:49:35.513503    1124 bulk_http.go:265] all bulk job done. return
E1128 15:49:35.513456    1124 pulsar_input.go:70] Consumer error: consumer closed: ConsumerClosed (<nil>)
E1128 15:49:35.513578    1124 pulsar_input.go:70] Consumer error: consumer closed: ConsumerClosed (<nil>)
E1128 15:49:35.513584    1124 pulsar_input.go:70] Consumer error: consumer closed: ConsumerClosed (<nil>)
E1128 15:49:35.513588    1124 pulsar_input.go:70] Consumer error: consumer closed: ConsumerClosed (<nil>)

这样是正常了不?哥不忙时候在帮我看一眼我改的 https://github.com/zcola/gohangout-input-pulsar

childe commented 9 months ago

看着一直在报错说 consumer 已经关闭了,但我没用过这个东西,看不出来是为啥,可能需要 Plugin 里面处理一下这种错误?

childe commented 9 months ago

是已经 shutdown 了?那要不加一个参数,shutdown 之后就不要再继续消费了?

zcola commented 9 months ago

是已经 shutdown 了?那要不加一个参数,shutdown 之后就不要再继续消费了?

if err == nil {
            PulsarInput.messages <- &msg
        } else {
            // The client will automatically try to recover from all errors.
            glog.Errorf("Consumer error: %v (%v)\n", err, msg)
        }
    }

https://github.com/arterhuo/gohangout-input-confluent-kafka-go/blob/main/ckafka_input.go 我模仿这个。。。他这个处理时有点粗暴,我是不是专门处理这种closed 报错?但是我分不清时shutdown导致还是服务端导致得?问的比较小白

childe commented 9 months ago

报错的时候,判断一下是不是 shutdown 了,如果是的话,就 break/return 吧。我觉得大概思路可以是这样。

ckafka_input 这块的确是没有处理。

zcola commented 9 months ago

发现一个新问题,消费两分钟就不消费了,如何定位

I1129 18:55:02.130297       1 bulk_http.go:191] request size:13706866
I1129 18:55:04.985685       1 bulk_http.go:251] get response[6771028]
I1129 18:55:04.985714       1 bulk_http.go:170] bulk done with execution_id 29 2.855 32090 11239.931
zcola commented 9 months ago

func main() { client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://ddasd10-pulsdasdaset:6650", }) if err != nil { log.Fatal(err) } defer client.Close()

    consumer, err := client.Subscribe(pulsar.ConsumerOptions{
            Topic:            "persistent://loghub/elk_zone_1/chansadsg",
            SubscriptionName: "zcola",
            Type:             pulsar.Shared,
    })
    if err != nil {
    log.Fatal(err)
    }
    defer consumer.Close()

    for {
            msg, err := consumer.Receive(context.Background())
            if err != nil {
                    log.Fatal(err)
            }
            if err := processMsg(msg); err != nil {
                    consumer.Nack(msg)
            } else {
                    consumer.Ack(msg)
            }

    }

}

写了一个最简单得没啥问题,hangout 插件 defer consumer.Close() defer client.Close() 我给注释掉了,不然无法运行刷

E1129 18:16:16.575321 12208 pulsar_input.go:70] Consumer error: consumer closed: ConsumerClosed () E1129 18:16:16.575352 12208 pulsar_input.go:70] Consumer error: consumer closed: ConsumerClosed () E1129 18:16:16.575357 12208 pulsar_input.go:70] Consumer error: consumer closed: ConsumerClosed ()

zcola commented 9 months ago

发现一个新问题,消费两分钟就不消费了,如何定位

I1129 18:55:02.130297       1 bulk_http.go:191] request size:13706866
I1129 18:55:04.985685       1 bulk_http.go:251] get response[6771028]
I1129 18:55:04.985714       1 bulk_http.go:170] bulk done with execution_id 29 2.855 32090 11239.931

解决了,但是我不知道shutdown判断应该怎么写哈

childe commented 9 months ago

func main() { client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://ddasd10-pulsdasdaset:6650", }) if err != nil { log.Fatal(err) } defer client.Close()

    consumer, err := client.Subscribe(pulsar.ConsumerOptions{
            Topic:            "persistent://loghub/elk_zone_1/chansadsg",
            SubscriptionName: "zcola",
            Type:             pulsar.Shared,
    })
    if err != nil {
    log.Fatal(err)
    }
    defer consumer.Close()

    for {
            msg, err := consumer.Receive(context.Background())
            if err != nil {
                    log.Fatal(err)
            }
            if err := processMsg(msg); err != nil {
                    consumer.Nack(msg)
            } else {
                    consumer.Ack(msg)
            }

    }

}

写了一个最简单得没啥问题,hangout 插件 defer consumer.Close() defer client.Close() 我给注释掉了,不然无法运行刷

E1129 18:16:16.575321 12208 pulsar_input.go:70] Consumer error: consumer closed: ConsumerClosed () E1129 18:16:16.575352 12208 pulsar_input.go:70] Consumer error: consumer closed: ConsumerClosed () E1129 18:16:16.575357 12208 pulsar_input.go:70] Consumer error: consumer closed: ConsumerClosed ()

在你这个函数里面,defer 里面肯定是不能用的,你需要再了解下 defer 机制哈~ :)

childe commented 9 months ago

发现一个新问题,消费两分钟就不消费了,如何定位

I1129 18:55:02.130297       1 bulk_http.go:191] request size:13706866
I1129 18:55:04.985685       1 bulk_http.go:251] get response[6771028]
I1129 18:55:04.985714       1 bulk_http.go:170] bulk done with execution_id 29 2.855 32090 11239.931

解决了,但是我不知道shutdown判断应该怎么写哈

给 PulsarInput 加一个 is_shutdown 属性,然后在 https://github.com/zcola/gohangout-input-pulsar/blob/main/pulsar_input.go#L70 这里判断一下是不是因为 shutdown 出错的; 如果是,就直接 return 了。

zcola commented 8 months ago

发现一个新问题,消费两分钟就不消费了,如何定位

I1129 18:55:02.130297       1 bulk_http.go:191] request size:13706866
I1129 18:55:04.985685       1 bulk_http.go:251] get response[6771028]
I1129 18:55:04.985714       1 bulk_http.go:170] bulk done with execution_id 29 2.855 32090 11239.931

解决了,但是我不知道shutdown判断应该怎么写哈

给 PulsarInput 加一个 is_shutdown 属性,然后在 https://github.com/zcola/gohangout-input-pulsar/blob/main/pulsar_input.go#L70 这里判断一下是不是因为 shutdown 出错的; 如果是,就直接 return 了。

加了好像还是一样的报错 https://github.com/zcola/gohangout-input-pulsar/blob/main/pulsar_input.go#L92

childe commented 8 months ago

你这个错误太明显。你再了解一下 go func() 是什么意思?结合你上面提到的 defer,我觉得你可能还不了解他们是干啥的。

zcola commented 8 months ago

https://github.com/zcola/gohangout-input-pulsar/commit/ccd301d9d9a04a9c8dff62b4a65247fc79ba194e

改好测试投生产了,谢谢解答哈,这个插件可以放到你得readme 里吗哈哈

childe commented 8 months ago

zcola/gohangout-input-pulsar@ccd301d

改好测试投生产了,谢谢解答哈,这个插件可以放到你得readme 里吗哈哈

加了