GreptimeTeam / greptimedb

An open-source, cloud-native, unified time series database for metrics, logs and events with SQL/PromQL supported. Available on GreptimeCloud.
https://greptime.com/
Apache License 2.0
4.18k stars 298 forks source link

[bug] Possible memory leak when ingesting Nginx access logs generated by Vector at 100 QPS whether the data is stored as metrics or logs #3067

Closed tx991020 closed 7 months ago

tx991020 commented 8 months ago

What type of bug is this?

Performance issue

What subsystems are affected?

Standalone mode

Minimal reproduce step

func BulkInsert(data []Message) error {
    logs.Info(" log lens", len(data))
    tx, err := db.Begin()
    if err != nil {
        log.Println("Error beginning transaction:", err)
        return err
    }
    defer tx.Rollback()

    stmt, err := tx.Prepare("INSERT INTO log_test (host, datetime, method, request, protocol, bytes, referer) VALUES (?, ?, ?, ?, ?, ?, ?)")
    if err != nil {
        log.Println("Error preparing statement:", err)
        return err
    }
    defer stmt.Close()

    // 批量值插入
    values := []interface{}{}
    for _, row := range data {
        values = append(values, row.Host, row.Datetime, row.Method, row.Request, row.Protocol, row.Bytes, row.Referer)
    }

    query := "INSERT INTO log_test (host, datetime, method, request, protocol, bytes, referer) VALUES "
    placeholders := "(?, ?, ?, ?, ?, ?, ?)"
    queryValues := []string{}

    for i := 0; i < len(data); i++ {
        queryValues = append(queryValues, placeholders)
    }

    query += fmt.Sprintf("%s", strings.Join(queryValues, ","))

    _, err = tx.Exec(query, values...)
    if err != nil {
        log.Println("Error executing bulk insert:", err)
        return err
    }

    err = tx.Commit()
    if err != nil {
        log.Println("Error committing transaction:", err)
        return err
    }

    return nil
}

What did you expect to see?

内存应该文档在一个区间,不会一直增长,一直增长可能运行不了几小时,内存就爆了

What did you see instead?

几十分钟,内存一直从开始几十M,涨到3G多,一直增长不会停止。 docker-compose down 把greptime 停了,cpu 会降下,但是内存不会降下来。 插入性能比mysql还差很多,远远低于postgres

What operating system did you use?

Ubuntu 18.04

Relevant log output and stack trace

似乎有内存泄漏,vector 生成日志,qps 100/s   sql 匹配插入, 内存一直增长
MichaelScofield commented 8 months ago

@tx991020 能否提供一下log_test表的建表SQL?还有“db”具体是哪个go library?

tx991020 commented 8 months ago
CREATE TABLE IF NOT EXISTS "log" (
  "host" STRING NULL,
  "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(),
  "func" STRING NULL,
  "log_level" STRING NULL,
  "message" STRING NULL,
  "service" STRING NULL,
  TIME INDEX ("ts"),
  PRIMARY KEY ("host")
)
package main

import (
    "context"
    "database/sql"
    "github.com/astaxie/beego/logs"
    "github.com/didi/gendry/manager"
    "strings"

    "fmt"

    "github.com/gin-gonic/gin"
    _ "github.com/go-sql-driver/mysql"
    "time"
)

var (
    db *sql.DB
    ch = make(chan *Message,1000)
)

func InitDB() {
    var (
        err error
    )
    db, err = manager.New("public", "", "", "192.168.0.220").Set(
        manager.SetCharset("utf8"),
        manager.SetAllowCleartextPasswords(true),
        manager.SetInterpolateParams(true),
        manager.SetTimeout(30*time.Second),
        manager.SetReadTimeout(30*time.Second),
    ).Port(4002).Open(true)
    if err != nil {
        panic(err)
    }

}

type Message struct {
    Host string
    Datetime string
    Method string
    Request string
    Protocol string
    Bytes int
    Referer string
}

type Response struct {
    Msg    string      `json:"msg"`
    Status int         `json:"status"`
    Data   interface{} `json:"data"`
}

func BulkInsert(data []*Message) error {
    logs.Info(" log lens", len(data))
    tx, err := db.Begin()
    if err != nil {
        fmt.Println("Error beginning transaction:", err)
        return err
    }
    defer tx.Rollback()

    stmt, err := tx.Prepare("INSERT INTO log (host, datetime, method, request, protocol, bytes, referer) VALUES (?, ?, ?, ?, ?, ?, ?)")
    if err != nil {
        fmt.Println("Error preparing statement:", err)
        return err
    }
    defer stmt.Close()

    // 批量值插入
    values := []interface{}{}
    for _, row := range data {
        values = append(values, row.Host, row.Datetime, row.Method, row.Request, row.Protocol, row.Bytes, row.Referer)
    }

    query := "INSERT INTO log (host, datetime, method, request, protocol, bytes, referer) VALUES "
    placeholders := "(?, ?, ?, ?, ?, ?, ?)"
    queryValues := []string{}

    for i := 0; i < len(data); i++ {
        queryValues = append(queryValues, placeholders)
    }

    query += fmt.Sprintf("%s", strings.Join(queryValues, ","))

    _, err = tx.Exec(query, values...)
    if err != nil {
        fmt.Println("Error executing bulk insert:", err)
        return err
    }

    err = tx.Commit()
    if err != nil {
        fmt.Println("Error committing transaction:", err)
        return err
    }

    return nil
}

func BatchTicker[T any](ctx context.Context, duration time.Duration, ch chan T, batchSize int, fn func([]T)) {
    ticker := time.NewTicker(duration)
    defer ticker.Stop()
    var batch = make([]T, 0, batchSize)
    for {
        select {
        case <-ctx.Done():
            if len(batch) > 0 {
                fn(batch)
            }
            return
        case v, ok := <-ch:
            if !ok { // closed
                fn(batch)
                return
            }
            batch = append(batch, v)
            if len(batch) == batchSize { // full
                fn(batch)
                batch = make([]T, 0, batchSize) // reset
            }
        case <-ticker.C:
            if len(batch) > 0 {
                fn(batch)
                batch = make([]T, 0, batchSize) // reset
            }
        }
    }
}
func main() {

    InitDB()
    r := gin.New()
    r.Use(gin.Recovery())
    r.Use(gin.Logger())
    go BatchTicker(context.Background(), 2*time.Second, ch, 200, func(data []*Message) {
        err := BulkInsert(data)
        if err != nil {
            logs.Error(err)
        }
    })
    r.POST("/receive", func(c *gin.Context) {

        list := make([]*Message, 0)
        err := c.BindJSON(&list)
        if err != nil {
            fmt.Println(err.Error())
            c.JSON(400, Response{
                Msg:    "invalid request payload",
                Status: -1,
            })
            return
        }
        for _, request := range list {
            ch <- request
        }

        if err != nil {
            fmt.Println("insert error", err.Error())
            c.JSON(400, Response{
                Msg:    "insert error",
                Status: -1,
            })
            return
        }

        c.JSON(200, gin.H{})
    })
    r.Run(":9100")
}

所有优化手段都用上了性能实在太差,qps 100/s 每2秒插入100条。go原生database/sql 40分钟,就是内存80M上涨到4.5G, 内存每秒上涨几M 4个月了,性能没有任何优化,有些失望,之前就提过一个issues,测评单机插入和查询性能远远低于mysql,比pg差距更远, 内存高于mysql. 这次又在公司推希望用上,就是看中greptime 支持sql 和promeQL 能同时存log 和metrics 方面统一查询. 希望好好优化下内存和查询速度问题,不优化下,根本没法用

tx991020 commented 8 months ago

[api] enabled = true

[sources.nginx_logs] type = "demo_logs" interval = 0.01 format = "json"

[sinks.http] type = "http" inputs = [ "nginx_logs" ] method = "post" encoding.codec = "json" batch.max_events = 100 buffer.max_events =500 uri = "http://127.0.0.1:9200/receive"

MichaelScofield commented 8 months ago

@tx991020 非常感谢您的反馈!我现在测试一下

v0y4g3r commented 8 months ago

Hi tx991020, 关于内存持续增长的问题:vector 的 demo_logs 生成的数据中,host 几乎是随机生成的,在这种场景下如果以 host 作为 primary key 可能会导致 GreptimeDB 的内部 buffer 的内存效率大大降低,从而在未触发 flush 的情况下占用过多的内存。这个问题我们会在接下来几个版本中支持 append 表,规避掉现在这种 buffer 模式带来的问题。目前如果有类似的场景需求,可以规划一下 tag 列的设计,尽量避免出现 tag 列取值为随机值的情况。


Hi tx991020, regarding the memory consumption issue: in the data generated by demo_logs in vector, the "host" is almost randomly generated. In this scenario, using "host" as a primary key may greatly reduce the memory efficiency of GreptimeDB's internal buffer. We are going to support append-only table mode in the next several releases to address this issue. Currently if you have to use GreptimeDB in such scenario, you can carefully design the tag columns to avoid random values.

v0y4g3r commented 8 months ago

关于这个问题的细节:目前 GreptimeDB 使用 TimeSeriesMemtable 作为内部 buffer 来存储新插入的行。该 memtable 使用 btree map 来存储每个“时间序列”的值 - 在您的情况下,“时间序列”表示所有可能的 primary key 组合。Vector 的 demo_log 为每一行生成随机 host,因此会有无限数量的时间序列,并且 memtable 中的 btree map 的内存占用将因创建随机 key 而膨胀。


To explain this issue in detail, currently GreptimeDB uses TimeSeriesMemtable as the internal buffer to store newly inserted rows. This memtable uses a btree map to store values for each "time series" -- in your case, time series means all possible combinations of tag columns. Vector's demo_log generates random hosts for each row, thus there will be inifinite time series and the btree map in memtable will explode as random keys are created.

killme2008 commented 8 months ago

感谢你的反馈和关注,这里其实核心问题是因为目前 GreptimeDB 当前是仅为了 metrics 数据来设计的,host 加入 primary key 就会导致时间线在随机数据下急剧膨胀,当前引擎还不支持 append 模式(前面 @v0y4g3r 已提到,我们短期可以支持下),而是强制在 memtable 里组织 pirmary key 去重,这样就导致内存和插入性能在这个场景下的双低效。我们有计划在 2024 年加入 log 和 trace 引擎的单独支持,只是暂未提上议程。

Thank you for your feedback and attention. The core issue here is that GreptimeDB is currently designed only for metrics data. Adding a host as a primary key would cause the time series to expand dramatically under random data. The current engine does not support append mode and instead enforces deduplication of primary keys in the memtable, resulting in low efficiency in terms of memory and insertion performance in this scenario. We plan to add support for log and trace engines in 2024, but it has not been scheduled yet.

tx991020 commented 8 months ago

单存metics 也有内存泄漏啊,我感觉还得好好检查下。 [sources.in] type = "host_metrics" scrape_interval_secs = 5

[sinks.greptime] inputs = ["in"] type = "greptimedb" endpoint = "greptimedb:4001" dbname = "public" 就这个最简单的metrics收集,几小时内存就涨到5G多,每秒都在涨

MichaelScofield commented 8 months ago

@tx991020 我在本地试了一下,如果是source=host_metrics,sink=greptime,没发现内存泄漏,GreptimeDB的内存稳定在115MB左右。这是我的vector配置:

data_dir = "/tmp/vector/data"

[api]
enabled = true

[sources.in]
type = "host_metrics"
scrape_interval_secs = 5

[sinks.greptime]
inputs = ["in"]
type = "greptimedb"
endpoint = "192.168.50.178:4001"
dbname = "public"

vector启动就简单的vector -qq --config ./vector-host_metrics-greptime.toml,GreptimeDB就是./greptime standalone start。方便的话能否把你的vector配置和GreptimeDB启动方式发一下?感谢

tx991020 commented 8 months ago

vector 全配置就是这个配置 [sources.in] type = "host_metrics" scrape_interval_secs = 5

[sinks.greptime] inputs = ["in"] type = "greptimedb" endpoint = "greptimedb:4001" dbname = "public" 你说的115M是空的greptime, 我这个greptime, 之前存过日志, 现在即使docker-compose down 重启,vector 只配上面这个简单的配置内存都降不下来。 就这个最简的配置昨天下个5个G, 昨晚重启早晨来看800M, 现在又涨到

image

这个简单配置,5秒存一次进本内存开销可以忽略不计。 应该还是有内存泄露 greptime: image: registry.cn-beijing.aliyuncs.com/watrix/greptime:latest container_name: greptimedb ports:

MichaelScofield commented 8 months ago

@tx991020 对于昨天测试的source=demo_logs,sink=http,http再通过prepare stmt写入GreptimeDB的场景,我们确实发现了内存占用大的问题。原因在于GreptimeDB当前的memtable实现是针对主键(primary key,或者说“tag”)较少的场景设计的。在source=demo_logs的测试场景中,“log"表的主键是”host“。这个”host“有非常之多,见下图: image 当前版本的GreptimeDB确实未支持这种场景。

一个简单的walkaround是将建表语句做如下修改:

CREATE TABLE IF NOT EXISTS log (
  host STRING NULL,
  `datetime` Timestamp NOT NULL DEFAULT current_timestamp(),
  `method` STRING NULL,
  request STRING NULL,
  protocol STRING NULL,
  bytes int NULL,
  referer STRING NULL,
  TIME INDEX (datetime),
  ## 将原来的”PRIMARY KEY (host)“ 删掉
);

并且在您的http写入程序中使用_微秒_作为nginx log的”datetime“(我未能在vector的source=demo_logs的文档里发现他生成datetime的时间格式,所以估计vector只能以”秒“级别生成日志。而”秒“级别是不够的)

tx991020 commented 8 months ago

现在是说单存metric 都内存泄露

tx991020 commented 8 months ago

greptime 是一个数据库,不是一个demo啊,不可能每次docker-compose up 起一个新的,跑几十分钟就完事。 得要一直运行,x现在的现象就是单存metrics都内存泄漏,内存占用过大,一个rust 写的数据库,表现的和一个Java,python写的一样,内存一直增长,数qps小,数据少,占用内存超大

MichaelScofield commented 8 months ago

@tx991020 我总结下现在的问题:

  1. 对于使用vector收集nginx日志的场景,内存占用过高
    1. 原因是现阶段GreptimeDB的memtable实现未进行针对性的优化,导致所有的数据都堆积在内存中(未触发flush)
      • 这个问题将在最近几个版本高优解决
    2. walkaround:可以在建表时将”host“从主键中去掉,同时将datetime改为微秒级别写入
  2. 重启GreptimeDB后,内存占用仍然很高
    • 原因和1一样,仍然是memtable的实现导致的。GreptimeDB重启时,会将WAL(write-ahead logging)中未flush的数据回放到memtable里
  3. 对于使用vector收集host metrics的场景:
    • 如果是继续使用上面的GreptimeDB,由于第二点的原因,内存占用仍然过高
    • 如果是空的GreptimeDB,内存占用(???)

最后那个”???“我刚才启动了vector,将scrape_interval_secs设置为1(不能更少了,0.1的话vector会报错),观察几个小时再看。GreptimeDB是要成为一个严肃的数据库产品的,绝对不可能让用户每次都启动新的。非常感谢您的关注和反馈,我们会不断努力解决各种问题~

tx991020 commented 8 months ago

单存指标还是有bug的,几天从80M,到800M, 到3G, 性能不如一个python写的web项目 data_dir = "/tmp/vector/data"

[api] enabled = true

[sources.in] type = "host_metrics" scrape_interval_secs = 5

[sinks.greptime] inputs = ["in"] type = "greptimedb" endpoint = "192.168.50.178:4001" dbname = "public" 这个配置,新建的一个文件夹,本地挂载也是新空的,重新docker-compose up -d ,

image

正常这个配置5秒采集一次,rust写的,内存超过100M都不正常, vectoriaMetrics go 写,同样的配置的比greptime 还多跑2天现在才148M

image
MichaelScofield commented 8 months ago

@tx991020 请问你测试的GreptimeDB版本是多少?在GreptimeDB启动时,日志第一行就是版本。比如

2024-01-09T02:23:40.457721Z  INFO cmd: short_version: 0.5.0, full_version: greptimedb-develop-3bd2f79
killme2008 commented 8 months ago

我猜测是我们内部这些 cache and buffer 引起的,默认都开的比较大

https://github.com/GreptimeTeam/greptimedb/blob/122b47210ef2b6f67dedd8748e43c9fc0c100eb8/config/standalone.example.toml#L193-L203

方便的话,除了执行 greptime --version 告知下版本之外,可以执行下 curl http://ip:4000/metrics 将内部的观测指标结果贴下,我们确认下,感谢。其中 ip 替换成您的服务器 IP 地址。

tx991020 commented 8 months ago

greptime:latest greptime服务已经停了

killme2008 commented 8 months ago

greptime:latest greptime服务已经停了

Sorry for that, we'll simulate this case and find out the root cause, I'll update this issue when we have progressed.

v0y4g3r commented 7 months ago

We did a 4-day test trying to reproduce this problem with following vector config:

[api]
enabled = true
address = "0.0.0.0:18686"

[sources.in]
type = "host_metrics"
scrape_interval_secs = 1

[sinks.greptime]
inputs = ["in"]
type = "greptimedb"
endpoint = "minipc-1:24001"
dbname = "public"

Seems that we cannot reproduce this in our environment.

Rows written per second:

image

Memory consumption:

image

Could you please tell us if any other operation was performed during the benchmark, like queries, restarts, etc. ?

brucel11qwe commented 7 months ago

任何操作都没做,qps 基本是0,没人用,从80M涨到3G. 我要把greptime 部署到一个4G/8G的 边缘设备上, 内存超过1G都无法接受

v0y4g3r commented 7 months ago

任何操作都没做,qps 基本是0,没人用,从80M涨到3G. 我要把greptime 部署到一个4G/8G的 边缘设备上, 内存超过1G都无法接受

Can you provide more information about your GreptimeDB instance, including:


您能提供有关您的GreptimeDB实例的更多信息吗,包括:

killme2008 commented 7 months ago

If there is no further information provided, I will temporarily close this issue. Please feel free to reopen it if you encounter any problems.