ydb-platform / ydb

YDB is an open source Distributed SQL Database that combines high availability and scalability with strong consistency and ACID transactions
https://ydb.tech
Apache License 2.0
4.02k stars 581 forks source link

Allow send arrow scheme and data as one data field #11790

Open rekby opened 1 day ago

rekby commented 1 day ago

Propose: allow to send arrow data without header.

for example table:

CREATE TABLE t (
    id Int64 NOT NULL,
    val Text,
    PRIMARY KEY (id)
)

allow bytes: https://gist.githubusercontent.com/rekby/620508c1ab889bf463eeb9b9b2f3b2d5/raw/e1288b16735716482038c1a7719f59cab4b04f44/gistfile1.txt

Now I have the error:

SCHEME_ERROR (code = 400070, address = localhost:2136, nodeID = 1, issues = [{'Bulk upsert to table '/local/t'Wrong schema in bulk upsert data'}])
Example of broken code ```go func TestTableArrowBulkUpsertDataExample(t *testing.T) { ctx := context.Background() db, err := ydb.Open(ctx, "grpc://localhost:2136/local") require.NoError(t, err) _ = db.Query().Exec(ctx, `DROP TABLE IF EXISTS t`) db.Query().Exec(ctx, ` CREATE TABLE t ( id Int64 NOT NULL, val Text, PRIMARY KEY (id) ) `, query.WithTxControl(query.DefaultTxControl())) require.NoError(t, err) schemaGo := arrow.NewSchema([]arrow.Field{ arrow.Field{Name: "id", Type: &arrow.Int64Type{}}, arrow.Field{Name: "val", Type: &arrow.StringType{}}, }, nil) a := memory.NewGoAllocator() ids := array.NewInt64Builder(a) ids.AppendValues([]int64{123, 234}, []bool{true, true}) defer ids.Release() vals := array.NewStringBuilder(a) vals.AppendValues([]string{"data1", "data2"}, []bool{true, true}) defer vals.Release() b := &bytes.Buffer{} aw := ipc.NewWriter(b, ipc.WithSchema(schemaGo)) err = aw.Write(array.NewRecord(schemaGo, []array.Interface{ ids.NewArray(), vals.NewArray(), }, 2)) require.NoError(t, err) goData := b.Bytes() // serialize schema, then data t.Log(goData) tablePath := "/local/t" err = db.Table().BulkUpsert(ctx, tablePath, table.BulkUpsertDataArrow( goData, )) require.NoError(t, err) } ```

Golang arrow library no simple way for separate schema and data binary: it can serialize arrow stream with stream end mark and can serialize scheme + data.

For extract data it needs:

  1. serialize scheme
  2. cut steam and marker bytes from scheme bytes
  3. serialize scheme + data
  4. cut scheme bytes from head of the data
Cut example ``` func (rows *tableRows) Data() ([]byte, error) { b := &bytes.Buffer{} aw := ipc.NewWriter(b, ipc.WithSchema(rows.scheme)) err := aw.Write(array.NewRecord(rows.scheme, []array.Interface{ rows.ID.NewArray(), rows.Val.NewArray(), }, rows.rowsCount)) if err != nil { return nil, err } goData := b.Bytes() goData = bytes.TrimPrefix(goData, bytes.TrimPrefix(rows.SchemeBytes, endStreamMarker[:])) return goData, nil } ```