aws / aws-sdk-go-v2

AWS SDK for the Go programming language.
https://aws.github.io/aws-sdk-go-v2/docs/
Apache License 2.0
2.5k stars 602 forks source link

Provide io.Reader wrapper around S3 SelectObjectContentEventStream #2618

Open corey-cole opened 2 months ago

corey-cole commented 2 months ago

Describe the feature

Within the SDK, offer an io.Reader wrapper around SelectObjectContentEventStream. It doesn't have to do everything that SelectObjectContentEventStream does (i.e. metrics) as long as the differences are documented.

Use Case

When working with S3 Select, I frequently want to pass the output to functions that want a Reader (e.g. io.Copy). One specific AWS use case is to stream results of S3 Select back into S3 via s3manager where UploadInput accepts an io.Reader for the Body.

Proposed Solution

I could see this being implemented in the s3manager package as it's somewhat related. This is a GenAI implementation and I could imagine that there are error scenarios not handled here.

type s3SelectReader struct {
    stream    *s3.SelectObjectContentEventStream
    remaining []byte // Buffer to store leftover data from previous event
    closed    bool   // Flag indicating whether the reader has been closed
}

func (r *s3SelectReader) Read(b []byte) (n int, err error) {
    // If the reader has been closed, return immediately with an error.
    if r.closed {
        return 0, io.ErrClosedPipe
    }
    var totalBytesRead int
    for {
        // If there is data remaining from the previous event, copy it to the output slice.
        if len(r.remaining) > 0 {
            n := copy(b[totalBytesRead:], r.remaining)
            totalBytesRead += n
            r.remaining = r.remaining[n:]
            if totalBytesRead == len(b) {
                return totalBytesRead, nil
            }
        }

        data, ok := <-r.stream.Events()
        if !ok {
            r.closed = true
            if totalBytesRead > 0 {
                return totalBytesRead, nil
            }
            return 0, io.EOF
        }
        switch v := data.(type) {
        case *s3types.SelectObjectContentEventStreamMemberRecords:
            n := copy(b[totalBytesRead:], v.Value.Payload)
            totalBytesRead += n
            if n < len(v.Value.Payload) {
                r.remaining = v.Value.Payload[n:]
            }
            if totalBytesRead == len(b) {
                return totalBytesRead, nil
            }
        case *s3types.SelectObjectContentEventStreamMemberEnd:
                        //Update 4/25:  Premature optimization to mark the stream as closed as soon as the event stream reaches an end.
            //r.closed = true
            if totalBytesRead > 0 {
                return totalBytesRead, nil
            }
            return 0, io.EOF
        default:
        }
    }
}

Other Information

No response

Acknowledgements

AWS Go SDK V2 Module Versions Used

github.com/aws/aws-sdk-go-v2 v1.26.1 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 // indirect
github.com/aws/aws-sdk-go-v2/config v1.27.11 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.11 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.5 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.5 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.53.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.20.5 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 // indirect
github.com/aws/smithy-go v1.20.2 // indirect

Go version used

go version go1.22.2 darwin/arm64