taosdata / driver-go

taos go driver
MIT License
90 stars 32 forks source link

tmq consume alter table tag msg would crash #129

Closed chenquan closed 1 year ago

chenquan commented 2 years ago

tdengine-server: 3.0.1.5-3.0.1.8(docker) driver-go: v3.0.3 client os: win10 使用原生方式连接

https://docs.taosdata.com/taos-sql/table/#%E4%BF%AE%E6%94%B9%E5%AD%90%E8%A1%A8%E6%A0%87%E7%AD%BE%E5%80%BC

崩溃截图: image

sangshuduo commented 2 years ago

@chenquan 能否贴一下代码和重现步骤?

chenquan commented 1 year ago

@sangshuduo 代码: https://docs.taosdata.com/develop/tmq/#%E7%A4%BA%E4%BE%8B%E4%BB%A3%E7%A0%81 插入数据后, 再修改tag,taso客户端会崩溃

huskar-t commented 1 year ago

@chenquan 请使用如下代码测试

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "strconv"
    "time"

    "github.com/taosdata/driver-go/v3/af"
    "github.com/taosdata/driver-go/v3/af/tmq"
    "github.com/taosdata/driver-go/v3/common"
    "github.com/taosdata/driver-go/v3/errors"
    "github.com/taosdata/driver-go/v3/wrapper"
)

func main() {
    db, err := af.Open("", "root", "taosdata", "", 0)
    if err != nil {
        panic(err)
    }
    defer db.Close()
    _, err = db.Exec("create database if not exists example_tmq VGROUPS 1")
    if err != nil {
        panic(err)
    }
    _, err = db.Exec("create topic if not exists example_tmq_topic with meta as DATABASE example_tmq")
    if err != nil {
        panic(err)
    }
    config := tmq.NewConfig()
    defer config.Destroy()
    err = config.SetGroupID("test")
    if err != nil {
        panic(err)
    }
    err = config.SetAutoOffsetReset("earliest")
    if err != nil {
        panic(err)
    }
    err = config.SetConnectIP("127.0.0.1")
    if err != nil {
        panic(err)
    }
    err = config.SetConnectUser("root")
    if err != nil {
        panic(err)
    }
    err = config.SetConnectPass("taosdata")
    if err != nil {
        panic(err)
    }
    err = config.SetConnectPort("6030")
    if err != nil {
        panic(err)
    }
    err = config.SetMsgWithTableName(true)
    if err != nil {
        panic(err)
    }
    err = config.EnableHeartBeat()
    if err != nil {
        panic(err)
    }
    err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) {
        if result.ErrCode != 0 {
            errStr := wrapper.TMQErr2Str(result.ErrCode)
            err := errors.NewError(int(result.ErrCode), errStr)
            panic(err)
        }
    })
    if err != nil {
        panic(err)
    }
    consumer, err := tmq.NewConsumer(config)
    if err != nil {
        panic(err)
    }
    err = consumer.Subscribe([]string{"example_tmq_topic"})
    if err != nil {
        panic(err)
    }
    _, err = db.Exec("create table example_tmq.st (ts timestamp,v int) tags(n int)")
    if err != nil {
        panic(err)
    }
    for {
        result, err := consumer.Poll(time.Second)
        if err != nil {
            panic(err)
        }
        if result.Type != common.TMQ_RES_TABLE_META {
            panic("want message type 2 got " + strconv.Itoa(int(result.Type)))
        }
        data, _ := json.Marshal(result.Meta)
        fmt.Println(string(data))
        consumer.Commit(context.Background(), result.Message)
        consumer.FreeMessage(result.Message)
        break
    }
    _, err = db.Exec("create table example_tmq.t1 using example_tmq.st tags(1)")
    if err != nil {
        panic(err)
    }
    for {
        result, err := consumer.Poll(time.Second)
        if err != nil {
            panic(err)
        }
        if result.Type != common.TMQ_RES_TABLE_META {
            panic("want message type 2 got " + strconv.Itoa(int(result.Type)))
        }
        data, _ := json.Marshal(result.Meta)
        fmt.Println(string(data))
        consumer.Commit(context.Background(), result.Message)
        consumer.FreeMessage(result.Message)
        break
    }
    _, err = db.Exec("insert into example_tmq.t1 values(now,1)")
    if err != nil {
        panic(err)
    }
    for {
        result, err := consumer.Poll(time.Second)
        if err != nil {
            panic(err)
        }
        if result.Type != common.TMQ_RES_DATA {
            panic("want message type 1 got " + strconv.Itoa(int(result.Type)))
        }
        data, _ := json.Marshal(result.Data)
        fmt.Println(string(data))
        consumer.Commit(context.Background(), result.Message)
        consumer.FreeMessage(result.Message)
        break
    }
    _, err = db.Exec("alter table example_tmq.t1 set tag n=123")
    if err != nil {
        panic(err)
    }
    for {
        result, err := consumer.Poll(time.Second)
        if err != nil {
            panic(err)
        }
        if result.Type != common.TMQ_RES_TABLE_META {
            panic("want message type 2 got " + strconv.Itoa(int(result.Type)))
        }
        data, _ := json.Marshal(result.Meta)
        fmt.Println(string(data))
        consumer.Commit(context.Background(), result.Message)
        consumer.FreeMessage(result.Message)
        break
    }
    consumer.Close()
}
chenquan commented 1 year ago

I'm sorry a little busy recently, I will give feedback in this weekend.

sangshuduo commented 1 year ago

I'm sorry a little busy recently, I will give feedback in this weekend.

any news?

chenquan commented 1 year ago

I'm sorry a little busy recently, I will give feedback in this weekend.

any news?

My old computer is broken. I'm using a macbook Now, and it's fine.

chenquan commented 1 year ago

I can't use Windows system test this issue.