ClickHouse / clickhouse-go

Golang driver for ClickHouse
Apache License 2.0
2.82k stars 546 forks source link

Memory leak from `INSERT`? #1293

Open flarco opened 2 months ago

flarco commented 2 months ago

Observed

Hi, I am seeing a big memory usage from the resultFromStatement after making a INSERT query. I believe the leak is happening here: https://github.com/ClickHouse/clickhouse-go/blob/28fd6a4954a5dbf09b7dcc0fcce597beb2dd0b58/lib/column/decimal.go#L238

See graph below. This happens when I insert millions of rows with Decimal & Int64 values. The decimal values are made using github.com/shopspring/decimal, with decimal.NewFromString. I am actually not using any of the result since I am making an INSERT. Not sure why it's appending from result and taking so much memory.

Here is the pprof output:

$ go tool pprof http://localhost:6060/debug/pprof/heap
Fetching profile over HTTP from http://localhost:6060/debug/pprof/heap
Saved profile in /Users/fritz/pprof/pprof.alloc_objects.alloc_space.inuse_objects.inuse_space.002.pb.gz
Type: inuse_space
Time: May 8, 2024 at 9:45am (-03)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top20
Showing nodes accounting for 472.25MB, 97.42% of 484.75MB total
Dropped 52 nodes (cum <= 2.42MB)
Showing top 20 nodes out of 28
      flat  flat%   sum%        cum   cum%
  296.42MB 61.15% 61.15%   296.42MB 61.15%  github.com/ClickHouse/ch-go/proto.(*ColDecimal128).Append (inline)
  138.53MB 28.58% 89.73%   138.53MB 28.58%  github.com/ClickHouse/ch-go/proto.(*ColInt64).Append (inline)
   33.01MB  6.81% 96.54%    33.01MB  6.81%  github.com/ClickHouse/ch-go/proto.(*ColUInt8).Append (inline)

image

Expected behaviour

Should not take up much memory after an INSERT call.

Code example

See https://github.com/slingdata-io/sling-cli/blob/main/core/dbio/database/database_clickhouse.go#L157

            insertStatement := conn.GenerateInsertStatement(
                table.FullName(),
                insFields,
                1,
            )

            stmt, err := conn.Prepare(insertStatement)
            if err != nil {
                g.Trace("%s: %#v", table, columns.Names())
                return g.Error(err, "could not prepare statement")
            }

            decimalCols := []int{}
            intCols := []int{}
            int64Cols := []int{}
            floatCols := []int{}
            for row := range batch.Rows {
                var eG g.ErrorGroup

                // set decimals correctly
                for _, colI := range decimalCols {
                    if row[colI] != nil {
                        val, err := decimal.NewFromString(cast.ToString(row[colI]))
                        if err == nil {
                            row[colI] = val
                        }
                        eG.Capture(err)
                    }
                }

                // set Int32 correctly
                for _, colI := range intCols {
                    if row[colI] != nil {
                        row[colI], err = cast.ToIntE(row[colI])
                        eG.Capture(err)
                    }
                }

                // set Int64 correctly
                for _, colI := range int64Cols {
                    if row[colI] != nil {
                        row[colI], err = cast.ToInt64E(row[colI])
                        eG.Capture(err)
                    }
                }

                // set Float64 correctly
                for _, colI := range floatCols {
                    if row[colI] != nil {
                        row[colI], err = cast.ToFloat64E(row[colI])
                        eG.Capture(err)
                    }
                }

                if err = eG.Err(); err != nil {
                    err = g.Error(err, "could not convert value for COPY into table %s", tableFName)
                    ds.Context.CaptureErr(err)
                    return err
                }

                count++
                // Do insert
                ds.Context.Lock()
                _, err := stmt.Exec(row...)
                ds.Context.Unlock()
                if err != nil {
                    ds.Context.CaptureErr(g.Error(err, "could not COPY into table %s", tableFName))
                    g.Trace("error for row: %#v", row)
                    return g.Error(err, "could not execute statement")
                }
            }

Environment

create table `default`.`tpcds_store_sales_tmp` (`ss_sold_date_sk` Nullable(Int64),
`ss_sold_time_sk` Nullable(Int64),
`ss_item_sk` Nullable(Int64),
`ss_customer_sk` Nullable(Int64),
`ss_cdemo_sk` Nullable(Int64),
`ss_hdemo_sk` Nullable(Int64),
`ss_addr_sk` Nullable(Int64),
`ss_store_sk` Nullable(Int64),
`ss_promo_sk` Nullable(Int64),
`ss_ticket_number` Nullable(Int64),
`ss_quantity` Nullable(Int64),
`ss_wholesale_cost` Nullable(Decimal(24,6)),
`ss_list_price` Nullable(Decimal(24,6)),
`ss_sales_price` Nullable(Decimal(24,6)),
`ss_ext_discount_amt` Nullable(Decimal(24,6)),
`ss_ext_sales_price` Nullable(Decimal(24,6)),
`ss_ext_wholesale_cost` Nullable(Decimal(24,6)),
`ss_ext_list_price` Nullable(Decimal(24,6)),
`ss_ext_tax` Nullable(Decimal(24,6)),
`ss_coupon_amt` Nullable(Decimal(24,6)),
`ss_net_paid` Nullable(Decimal(24,6)),
`ss_net_paid_inc_tax` Nullable(Decimal(24,6)),
`ss_net_profit` Nullable(Decimal(24,6))) engine=MergeTree  ORDER BY tuple()
alisman commented 1 month ago

@jkaflik any timeframe on this?

jkaflik commented 1 month ago

@flarco, thanks for reporting this. There is definitely some unexpected behavior here. Could you clarify on:

I am actually not using any of the result since I am making an INSERT.

please?

@alisman initially assigned me to triage. Currently, we don't have the capacity to have a look shortly. Any contributions are welcome.

flarco commented 1 month ago

Limiting the number of rows inserted per transaction helps a lot with lowering the memory usage. So that works as a work-around.

The issue stands, though: if millions of rows are inserted in 1 transaction, the memory leakage crashes the process.