EmerisHQ / tracelistener

UNIX named pipes-based real-time state listener for Cosmos SDK blockchains
GNU Affero General Public License v3.0
33 stars 8 forks source link

fix(tracelistener): chunk writeback data units to avoid database placeholder issues #45

Closed gsora closed 2 years ago

gsora commented 2 years ago

CockroachDB and PostgreSQL have a maximum amount of placeholders a query can hold. A placeholder is the $1 thingy sqlx will substitute to each struct replacement statement we add into a query.

So for example a sqlx query like this:

select * from tableName where field = :fieldname;

will be rendered as a PostgreSQL query like this:

select * from tableName where field = '$1';

A single query can have a maximum of 65535 placeholders, this function keeps this amount open to aid testability.

When a query has more than the amount of placeholders the DBMS likes, it will fail.

We use lots of placeholders in INSERTs, and we might've lost some data because of this error.

This function calculates the amount of placeholders for a WritebackOp, assuming that the number of struct fields of a single unit of wo.Data is what we encounter in wo.Data - which indeed it is.

So for e.g. an AuthRow, which is built like this:

type AuthRow struct {
    TracelistenerDatabaseRow

    Address        string `db:"address" json:"address"`
    SequenceNumber uint64 `db:"sequence_number" json:"sequence_number"`
    AccountNumber  uint64 `db:"account_number" json:"account_number"`
}

the amount of placeholders its associated query is 4: 3 top-level fields + TracelistenerDatabaseRow. TracelistenerDatabaseRow is a special case because it holds the database autogenerated ID - which doesn't count against the placeholder count, since it's autogenerated - and the chain name, which counts against the placeholder count.

We use reflect to count fields, which already counts them correctly, one less problem!

Math here is very simple.

For a WritebackOp wo, the total sum of placeholders can be derived by multiplying len(wo.Data) * (amount of fields in wo.Data[0]). We then calculate the maximum amount of tracelistener.DatabaseEntrier (wo.Data holds that kind of element, it's an interface), called splitAmount, for each new chunk of wo.Data, rounding by excess - we prefer having one more WritebackOp rather than potentially getting too close to limit.

Once the chunking function returns, we create a new WritebackOp object with at most splitAmount data.

DeshErBojhaa commented 2 years ago

IDK, seems smelly. I would suggest to incorporate bulk import.

gsora commented 2 years ago

Could you elaborate?

Thanks!

DeshErBojhaa commented 2 years ago

Could you elaborate?

I have something like this in mind. Using CopyFrom function.

type Data struct {
    ch     chan []interface{}
    values []interface{}
    err    error
}

func (d *Data) Next() bool {
    d.values, ok := <- d.ch
    if !ok {
        return false
    }

    return true
}

func (d *Data) Err() error {
    return d.err
}

func (d *Data) Values() ([]interface{}, error) {
    return d.values, d.err
}

func bulk(conn *pgx.Conn) error {
        d := Data{
            ch := make(chan interface{})
        }
        var wg sync.WaitGroup
        wg.Add(1)
        go func() {
            defer wg.Done()
            for i := 0; i < 100000; i++ {
                d.ch <- []interface{i}
            }
        }()

    count, err := conn.CopyFrom(context.Background(), pgx.Identifier{"table"}, []string{"val"}, &d)
    if err != nil {
        return fmt.Errorf("conn.CopyFrom %w", err)
    }

    fmt.Println("rows", count)
        wg.Wait()
    return nil
}

But it's possible I totally misunderstood the problem here.

Untested. Just wrote inline here.

gsora commented 2 years ago

I'm not sure I'm getting the issue here.

This PR tries to address the amount of postgres placeholders we can use in a single statement, which is not tied to the amounts of elements to insert, rather the amount of placeholders bound to the list of elements to insert.

How does your method addresses this?

Also, could you elaborate on the smelliness?

Thanks!

DeshErBojhaa commented 2 years ago

Also, could you elaborate on the smelliness?

Hi, this PR is well documented and easy to understand. So, no smell. Apologies for my misleading words. What I thought maybe we don't need all these? Again I am sure I am wrong here, but for the sake of completion, I was thinking if we follow the chunking approach, maybe something like this is enough?

take := 65535 / reflect.TypeOf(p.Data[0]).NumField() // Assuming len(p.Data) > 0
for i := 0; i < len(p.Data); i += take {
    end := i + take
    if end > len(p.Data) {
    end = len(p.Data)
    }
    is := (tracelistener.WritebackOp{p.DatabaseExec, p.Data[i:end]}).InterfaceSlice()
    if err := di.Add(p.DatabaseExec, is); err != nil {
        logger.Error("database error ", err)
    }
}

Again, sorry if I missed something here (which is very much possible)

gsora commented 2 years ago

@DeshErBojhaa your proposed solution might work as well indeed! I just thought that splitting everything in single funcs might be better for testing.

Pitasi commented 2 years ago

@gsora needs a rebase