apache / arrow

Apache Arrow is the universal columnar format and multi-language toolbox for fast data interchange and in-memory analytics
https://arrow.apache.org/
Apache License 2.0
14.6k stars 3.54k forks source link

[Go][FlightSQL] Large data transfer #37976

Closed polestar1988 closed 1 year ago

polestar1988 commented 1 year ago

Describe the usage question you have. Please include as many useful details as possible.

I have implemented my application using Apache arrow flight in golang, In the server side I created arrow record and send to the client side : this is my DoGet function :

 func (s *Server) DoGet(ticket *flight.Ticket, server flight.FlightService_DoGetServer) error {
      queryResult, err := queryDatasource(dataSource, query)
            if err != nil { 
                  return err 
            } 
      defer queryResult.Close() 
      pool := memory.NewGoAllocator() 
      // ArrowSchema Parse schema to get column types and column names and returns *arrow.Schema
      schema, err := queryResult.ArrowSchema() 
      if err != nil { 
            return err
      } 
      b := array.NewRecordBuilder(pool, schema) 
      defer b.Release()
      //WriteValues writes the values inside the *array.RecordBuilder
      err = queryResult.WriteValues(b)
       if err != nil {
             return err
      } 
      rec := b.NewRecord()
      if err != nil{
            return err
      } 
      defer rec.Release()
      rw := flight.NewRecordWriter(server, ipc.WithSchema(rec.Schema())) 
      defer rw.Close() 
       if err := rw.Write(rec); err != nil { 
            return err
      } 
 return nil
 }

and in the client side :

 //Retrieve data
 doGetClient, err := client.DoGet(ctx, doGetTicket) 
 if err != nil { 
      return fmt.Errorf("failed to get data: %v", err) 
 } 
 reader, err := flight.NewRecordReader(doGetClient, nil) 
 if err != nil {
      fmt.Printf("Creating record reader err: %v", err) 
      return  err
 }
 defer reader.Release()
 var record arrow.Record 
 for { 
      rec, err := reader.Read()
      if err != nil { 
            if err == io.EOF {
            break 
        } 
       return err 
    } 

} As you know as the data transferring is limited by 4Mb in grpc , and I faced the error : "received message larger than max (5000000 vs 4194304)" how can I create chunks of data and send it to the client side and in client side how can I put back the received chunks together and use it?

Component(s)

Go

zeroshade commented 1 year ago

There's a few possible ways of handling this server side:

  1. Clients could use the call option MaxCallRecvMsgSize(s) to increase that 4MB limit if desired, or use the dial option WithDefaultCallOptions(MaxCallRecvMsgSize(s)) when connecting to by default raise the 4MB limit.
  2. Rather than creating a single record with all of your results, you could create multiple records. The RecordBuilder resets with the same schema after you call NewRecord so you can easily write to it again and generate a new record. Calling rw.Write(rec) on each record you want to create (i.e. chunks)
  3. You can slice a record via rec.NewSlice(i, j) which will create a slice of the record (slices need to have Release called on them separately!) without performing a copy, and then just call rw.Write(slice) on each slice of the record allowing you to chunk it out sending slices of the record at a time.

On the client side, you already have the loop, so you can create an Arrow table from the records:

recs := make([]arrow.Record, 0)
for reader.Next() { 
        r := reader.Record()
        r.Retain()
        defer r.Release()
        recs = append(recs, r)
}
if err := reader.Err(); err != nil {
        return err
}
// if you want to treat all of the records as a single table without copying
tbl := array.NewTableFromRecords(reader.Schema(), recs)
defer tbl.Release()
// do stuff with tbl....
polestar1988 commented 1 year ago

@zeroshade Thanks for your response, As I want to have no limit on my data size I Already splitted data using NewSlice : // Split records into smaller slices

chunkSize := 4 * 1024 * 1024 // Bytes
    recordChunks := sliceRecordByBytes(transformedRecord, chunkSize)
    chunkSchema := recordChunks[0].Schema()
    currentChunk := make([]arrow.Array, 0)
    for _, rec := range recordChunks {
        for i := 0; i < int(rec.NumCols()); i++ {
            column := rec.Column(i)
            currentChunk = append(currentChunk, column)
        }
        // Create a Flight writer
        writeChunkToStream(server, chunkSchema, currentChunk)
        currentChunk = nil
    }
// Function to calculate size based on data type
func calculateSize(dataType arrow.DataType) int {
    switch dataType.(type) {
    case *arrow.Int64Type:
        return 8 // 8 bytes for Int64
    case *arrow.FixedSizeBinaryType, *arrow.BinaryType:
        // Adjust this size calculation based on your requirements
        return 16 // Example: 16 bytes for FixedSizeBinary
    case *arrow.Float64Type:
        return 8 // 8 bytes for Float64
    case *arrow.Date64Type:
        return 8 // 8 bytes for Date64
    case *arrow.Time32Type, *arrow.TimestampType:
        // Adjust this size calculation based on your requirements
        return 16 // Example: 16 bytes for Timestamp
    case *arrow.Int16Type:
        return 2 // 2 bytes for Int16
    case *arrow.StringType:
        // Adjust this size calculation based on your requirements
        return 32 // Example: 32 bytes for String
    default:
        // Default to 0 if the data type is unknown
        return 0
    }
}
func sliceRecordByBytes(record arrow.Record, size int) []arrow.Record {

    // Get the number of rows in the record
    numRows := record.NumRows()
    // Create a slice of records to store the slices
    slices := make([]arrow.Record, 0)
    // Initialize variables for slicing
    currentSize := 0
    currentRows := make([]int, 0)
    // Loop over the rows in the record
    for i := 0; i < int(numRows); i++ {
        // Calculate the size of the row in bytes based on data type
        rowSize := 0
        for j := 0; j < int(record.NumCols()); j++ {
            col := record.Column(j)
            rowSize += calculateSize(col.DataType())
        }
        // Check if adding this row would exceed the size limit
        if currentSize+rowSize > size {
            // If yes, create a slice from the current rows
            slice := record.NewSlice(int64(currentRows[0]), int64(currentRows[len(currentRows)-1]))
            // Append it to the slices slice
            slices = append(slices, slice)
            // Reset the current size and rows
            currentSize = 0
            currentRows = []int{i}
        } else {
            // If no, add this row to the current rows
            currentRows = append(currentRows, i)
        }
        // Update the current size
        currentSize += rowSize
    }
    // Create a slice from the remaining rows
    if len(currentRows) > 0 {
        slice := record.NewSlice(int64(currentRows[0]), int64(currentRows[len(currentRows)-1]))
        // Append it to the slices slice
        slices = append(slices, slice)
    }
    // Return the slices
    return slices
}

in the client side:

recs := make([]arrow.Record, 0)
    for reader.Next() {
        r := reader.Record()
        r.Retain()
        defer r.Release()
        recs = append(recs, r)
    }
    if err := reader.Err(); err != nil {
        log.Print("ERROR", err)
    }
    tbl := array.NewTableFromRecords(reader.Schema(), recs)
    defer tbl.Release()

The same error with this code: And error arrow/ipc: invalid message type (got=Schema, want=RecordBatch)

for {
    rec, err := reader.Read()
    if err != nil {
        if err == io.EOF {
        // End of data stream, break the loop
            break
        }
    }
    defer rec.Release()
}

That's because the slices also containing the schema , and writing the slices without schema is not possible, How can I handle this?

zeroshade commented 1 year ago

So a couple things first:

you can check if fw, ok := dt.(arrow.FixedWidthDataType); ok { return fw.Bytes() } which will get you the bytes per element for a fixed width data type without needing to do the full type switch like you're doing)

Data types also have a Layout() method which returns a slice of BufferSpec objects, if the Kind is SpecFixedWidth then there is a Bytes member which will be the byte size. Again, allowing you to get this info without needing to do the type switch or explicitly verify per data type.

You're also not including the size of the null bitmaps in your computation which may be possibly non-negligible.

Now, the reason why you're getting that error, is because you are creating multiple writers across the same stream, you need only one writer to write the stream:

Instead of this:

chunkSize := 4 * 1024 * 1024 // Bytes
recordChunks := sliceRecordByBytes(transformedRecord, chunkSize)
chunkSchema := recordChunks[0].Schema()
currentChunk := make([]arrow.Array, 0)
for _, rec := range recordChunks {
    for i := 0; i < int(rec.NumCols()); i++ {
        column := rec.Column(i)
        currentChunk = append(currentChunk, column)
    }
    // Create a Flight writer
    writeChunkToStream(server, chunkSchema, currentChunk)
    currentChunk = nil
}

You should do this:

rw := flight.NewRecordWriter(server, ipc.WithSchema(transformedRecord.Schema()))
defer rw.Close() 

chunkSize := 4 * 1024 * 1024 // Bytes
recordChunks := sliceRecordByBytes(transformedRecord, chunkSize)
defer func() {
        for _, chunk := range recordChunks {
                chunk.Release()
        }
}()

for _, slice := range recordChunks {
        if err := rw.Write(slice); err != nil {
                return err
        }
}

Every time you create a writer, the first thing it does is send a Schema message, you don't want multiple writers. you just want to write the slices to the writer separately. If you wanted, you could even combine these better and instead of creating all the slices and then sending them one by one, you could just find where you're going to slice, write that slice, call release on it, and then find the next slice.... rinse and repeat. That way you don't need a slice of records, and have fewer allocations.

Just an idea

polestar1988 commented 1 year ago

@zeroshade Thanks so much for your response, Just to make sure, when I read records in the client side using:

for {
    rec, err := reader.Read()
    if err != nil {
       if err == io.EOF {
      // End of data stream, break the loop
     break
    }
}
defer rec.Release()
}

I get each record like this:

record:
  schema:
  fields: 3
    - first_name: type=utf8, nullable
    - last_name: type=utf8, nullable
    - age: type=utf8, nullable
  rows: 9
  col[0][first_name]: ["John" "John" "John" "John" "John" "John" "John" "John" "John"]
  col[1][last_name]: ["Doe" "Doe" "Doe" "Doe" "Doe" "Doe" "Doe" "Doe" "Doe"]
  col[2][age]: ["30" "30" "30" "30" "30" "30" "30" "30" "30"]

Is there any way to concatenate record to one record or it also should be done by writing a function to do concatenation?

zeroshade commented 1 year ago

Is there a particular reason you need them all as a single record rather than just being able to treat the entire group of records as a single table via NewTableFromRecords ?

If you absolutely have to concatenate them, my recommendation would be to call https://pkg.go.dev/github.com/apache/arrow/go/v13@v13.0.0/arrow/array#RecordToStructArray on each record, then use https://pkg.go.dev/github.com/apache/arrow/go/v13@v13.0.0/arrow/array#Concatenate to concatenate them all together, and finally https://pkg.go.dev/github.com/apache/arrow/go/v13@v13.0.0/arrow/array#RecordFromStructArray to get the record back out.

That's probably the simplest and most efficient way to concatenate if you absolutely must concatenate them. But I recommend seeing if using an arrow.Table is sufficient for your usage and avoid the memory allocations and copies unless you absolutely must have the entire thing as a single contiguous record batch (which in the vast majority of scenarios, you don't really need that and a table is sufficient).

polestar1988 commented 1 year ago

@zeroshade the reason is that I have to get the data and apply some logic on it and then show it to the user, and the logic might need the whole data. Thanks so much for reply I really appreciate you 🙏🏻

zeroshade commented 1 year ago

@polestar1988 Depending on the logic you're using you might be able to simply use array.NewTableReader to iterate over chunks of the table and apply your logic or use the compute package with the table or slice of records.

As for showing the data to the user, please see the example in the docs here which demonstrates one possible way of doing it. arrow.Table also has its own String() method which would let you just print the whole table as one output, alternately you could also use JSON/CSV output if desired which would let you write the data to a human readable file, etc.

There's several ways you could go about it, but if you really want to concatenate it all into a single record, then please try the suggestions from my previous response and let me know if everything works for you.

polestar1988 commented 1 year ago

@zeroshade Thanks so much for your suggestions I implemented and it worked , the only thing is calculating the size of data which I have implemented this function:

// Function to calculate size based on data type
func calculateSize(dataType arrow.DataType, numElements int) int {
    // Initialize variables for element size and null bitmap size
    elementSize := 0
    nullBitmapSize := 0
    // Check if the data type is fixed width
    if fw, ok := dataType.(arrow.FixedWidthDataType); ok {
        // Get the bytes per element for any fixed width data type
        elementSize = fw.Bytes()
        // Get the size of the null bitmap in bytes
        nullBitmapSize = bitutil.CeilByte(numElements)
    } else {
        switch dataType.(type) {
        case *arrow.StringType:
            // as string has not fixed size we appriximated as 100 bytes
            return 100
        case *arrow.BinaryType:
            // as binary is for storing larger data and has not fixed size we appriximated as 1000 bytes
            return 1000
        }
    }
    // Return the total size of the column in bytes
    return elementSize*numElements + nullBitmapSize
}

is there any way to calculate also the size of data for non fixed width data types like string or binary?

zeroshade commented 1 year ago

So, for types like string and binary, it's generally pretty easy to figure out. In both cases you have your null bitmaps, for String/Binary you're going to have 32bit offsets (length + 1 values), LargeString/LargeBinary would have 64bit offsets (again, length + 1 values). Then you can determine the actual character data by simply taking the difference between the offset value for the first row and the last row + 1 (which would be the total length of the character data for those rows).

polestar1988 commented 1 year ago

@zeroshade thanks so much 🙏🏻

kou commented 1 year ago

Can we close this?

polestar1988 commented 1 year ago

@kou yes

kou commented 1 year ago

OK. I close this.

frbvianna commented 1 year ago

I face a very similar situation, except that we are not using FlightSQL, but the IPC writer directly. This issue already shed some light, thank you.

However, I am curious if this chunking could have been done based on the actual IPC encoded bytes instead, as opposed to calculating/estimating the number of elements based on the arrow.Record, which is anyways compressed later on. You only get to know the final size once you encode it, and every slice of Record written to the underlying buffer will be decoded individually instead of adding up to a single Record. So it does not seem feasible to encode e.g. slices of each row at a time and keep watching the buffer size, then only send to the gRPC stream once the size limit is reached. Each data chunk would end up in multiple Records in the receiver side, which does not seem ideal.

Do you see any other feasible way we might have achieved this?

zeroshade commented 1 year ago

Hey @frbvianna,

By default there is no compression enabled, so the bytes in the buffers of the columns would be a relatively accurate representation of the size of the resulting IPC messages (record batch messages are just a very small flatbuffer header + the raw buffer bytes for all of the columns). You could certainly enable compression if desired via the WithZstd() or WithLZ4() options when you create the ipc writer, which will compress at a buffer level and if the compression doesn't actually shrink the buffer, then it will send it uncompressed, which as you said would make estimating the size pretty difficult, but you'd be still guaranteed that any estimate calculated by determining the size of the buffers would be an upper-end. The final size of the message would never be larger than the flatbuffer header + the estimated buffers.

All that being said, I would probably not attempt to calculate a per row estimate like was suggested here, but instead do it in batches of some amount (probably a power of two for easy computation). The easier / better solution would be for whatever is producing the arrow records in the first place to chunk the data better before even getting to your writer, if that is feasible.

Each data chunk would end up in multiple Records in the receiver side, which does not seem ideal.

That actually depends on what you're actually doing with the data on the receiver side. You can use an arrow.Table to treat a slice of records as a single large table for most cases and operations. The fact that the data is chunked into multiple records wouldn't actually be an issue unless there's some reason you truly need the entire dataset as a single contiguous record batch, which is rare. Arrow as a format is intended to be easily chunked and streamed, offhand I honestly can't really think of a scenario where you really need everything in a single contiguous record other than to reduce a little bit of complexity (having to iterate records instead of operating on a single large one), and in most cases operating on a group of smaller records is more performant than operating on a single large one as you can easily parallelize whatever work you're doing with each record being an easily parallelizable unit of work.

I hope this explanation above helps, if you have further questions feel free to either file a new issue or join the conversation on the Zulip / the Gopher slack (there's an #apache-arrow-parquet channel there which I am part of).

frbvianna commented 1 year ago

Thanks @zeroshade for your detailed explanation. Since the alternative of chunking based on the IPC encoded bytes like I suggested is a bit more grey area to me, mostly concerning its performance, I am convinced that the row size estimation proposed in this issue might be a good place to start.

I am a bit confused though around the null bitmaps, offsets and variable data type width estimation. Would it not make sense for the Arrow library to provide that sort of calculation? I find myself reaching into some Arrow internals to fully grasp how to judiciously calculate the total row size from the Record column buffers.

@polestar1988, would you mind sharing your complete calculation code, please?

frbvianna commented 1 year ago

Hey @zeroshade, so eventually I got somewhere around estimating values row by row, column by column. Essentially, estimating the entire record batch size from each individual value. Sort of tried to combine all of what was discussed here and using BufferSpec layout objects only. I haven't spent much effort into optimizing the write yet (e.g. by slicing each chunk, then immediately writing and releasing it).

What I've checked is that it mostly works as an upper-end estimation for large enough maximum chunk sizes, when many rows are included per chunk and their slightly overestimated individual sizes add up to compensate for what seems to be the IPC flatbuffer message header bytes on each chunk record (a couple kB at most) that I have decided to neglect so far.

Can you please take a look? Any feedback or ideas on how we might have estimated the flatbuffer header ahead of writing would be much appreciated. Thank you.

// executed for all rows (1 <= rowIdx <= record.NumRows)
func estimateRowSize(cols []arrow.Array, rowIdx int) uint64 {
    var size uint64

    for _, col := range cols {
        size += estimateRowValueSize(col, rowIdx)
    }

    return size
}

func estimateRowValueSize(col arrow.Array, rowIdx int) uint64 {
    var size uint64

    for _, bufSpec := range col.DataType().Layout().Buffers {
        switch bufSpec.Kind {
        case arrow.KindFixedWidth:
            // size of fixedwidth primitive types or
            // the varwidth offset type (int32 or int64)
            size += uint64(bufSpec.ByteWidth)

        case arrow.KindBitmap:
            // null indicator bitmap
            // upper-end estimation of one byte for each element
            size += 1

        case arrow.KindVarWidth:
            // binary-like variable width types
            // element size calculated from buffer offset diff
            if bin, ok := col.(array.BinaryLike); ok {
                valueSize := bin.ValueOffset64(rowIdx) - bin.ValueOffset64(rowIdx-1)
                size += uint64(valueSize)
            }

        default:
            // arrow.KindAlwaysNull represents zero allocations
            continue
        }
    }

    return size
}