awslabs / aws-sdk-rust

AWS SDK for the Rust Programming Language
https://awslabs.github.io/aws-sdk-rust/
Apache License 2.0
2.98k stars 246 forks source link

Unable to Deserialize S3 Select JSON #1132

Closed cobbinma closed 4 months ago

cobbinma commented 5 months ago

Describe the bug

When using S3 Select we receive occasional errors where we are unable to deserialize a JSON record.

These errors are from serde and are either 'missing field' or 'duplicate field' errors.

"duplicate field `savingsPercentage` at line 1 column 692"

The JSON that produces these errors seem as though they are formed from two separate CSV rows. Could this be caused by partial records being received in an incorrect order?

Expected Behavior

S3 Select should produce valid JSON that we are able to deserialize.

serde_json::from_str::<T>(record) 

Current Behavior

"duplicate field `savingsPercentage` at line 1 column 692"

Reproduction Steps

SELECT * FROM S3Object s WHERE createdAt >= '2023-04-24';

output.csv

    /// query an s3 file using s3 select and parse into json records
    async fn select_records(
        &self,
        key: String,
        query: String,
        select_format: SelectFormat,
    ) -> Result<Vec<String>, ReportingError> {
        tracing::Span::current().record("TraceID", telemetry::get_trace_id().as_str());

        let bucket = self.bucket.to_string();

        tracing::info!("bucket: {bucket} key: {key} query: {query}");

        let mut output = self
            .aws
            .select_object_content()
            .bucket(&self.bucket)
            .key(key)
            .expression_type(ExpressionType::Sql)
            .expression(query)
            .input_serialization(select_format.input)
            .output_serialization(select_format.output)
            .send()
            .await
            .map_err(|e| {
                let service_error = e.into_service_error();

                match service_error.meta().code() {
                    Some("NoSuchKey") => ReportingError::NotFound,
                    _ => ReportingError::Unexpected(anyhow!(service_error)),
                }
            })?;

        let mut processed_records: Vec<String> = vec![];
        // partial records can be contained in the payload (partial records are incomplete json)
        // this is keep a buffer of records that clears when the buffer is valid json
        let mut buf = String::default();
        while let Some(event) = output
            .payload
            .recv()
            .await
            .map_err(|e| ReportingError::Unexpected(anyhow!(e)))?
        {
            if let SelectObjectContentEventStream::Records(records) = event {
                let records = records
                    .payload
                    .as_ref()
                    .map(|p| std::str::from_utf8(p.as_ref()))
                    .ok_or_else(|| ReportingError::Unexpected(anyhow!("unable to parse payload")))?
                    .unwrap_or_default()
                    .to_string();
                for line in records.lines() {
                    if let Some(record) = parse_line_buffered(&mut buf, line) {
                        processed_records.push(record.to_string());
                    }
                }
            }
        }

        Ok(processed_records)
    }
}

/// parse a new line, potentially using content from the previous line
fn parse_line_buffered(buf: &mut String, line: &str) -> Option<String> {
    if buf.is_empty() && is_valid_json(line) {
        return Some(line.to_string());
    };

    buf.push_str(line);
    if is_valid_json(&buf) {
        let record = buf.to_string();
        buf.clear();
        Some(record)
    } else {
        None
    }
}

fn is_valid_json(data: impl AsRef<str>) -> bool {
    serde_json::from_str::<IgnoredAny>(data.as_ref()).is_ok()
}
    /// fetch a list of records and deserialise them into a list of generic structs
    async fn select<T: DeserializeOwned + Send + Sync>(
        &self,
        key: String,
        query: String,
        select_format: SelectFormat,
    ) -> Result<Vec<T>, ReportingError> {
        let all_records_string = self.aws.select_records(key, query, select_format).await?;

        Ok(all_records_string
            .iter()
            .filter(|record| !record.is_empty())
            .filter_map(|record| match serde_json::from_str::<T>(record) {
                Ok(record) => Some(record),
                Err(error) => {
                    tracing::error!(message = "unable to deserialize record", %record, %error);
                    None
                }
            })
            .collect())
    }

Possible Solution

No response

Additional Information/Context

We are using code very similar to the S3 Select Example

https://github.com/awslabs/aws-sdk-rust/blob/main/examples/examples/s3/src/bin/select-object-content.rs

I do not receive errors when running in the AWS console so I believe it is related to the rust SDK.

Version

v0.60.5 (*)
│   ├── aws-smithy-json v0.60.5 (*)
│   ├── aws-smithy-runtime v1.1.5 (*)
│   ├── aws-smithy-runtime-api v1.1.5 (*)
│   ├── aws-smithy-types v1.1.5 (*)
│   ├── aws-types v1.1.5 (*)
├── aws-sdk-s3 v1.15.0
│   ├── aws-credential-types v1.1.5 (*)
│   ├── aws-runtime v1.1.5 (*)
│   ├── aws-sigv4 v1.1.5 (*)
│   ├── aws-smithy-async v1.1.5 (*)
│   ├── aws-smithy-checksums v0.60.5
│   │   ├── aws-smithy-http v0.60.5 (*)
│   │   ├── aws-smithy-types v1.1.5 (*)
│   ├── aws-smithy-eventstream v0.60.4 (*)
│   ├── aws-smithy-http v0.60.5 (*)
│   ├── aws-smithy-json v0.60.5 (*)
│   ├── aws-smithy-runtime v1.1.5 (*)
│   ├── aws-smithy-runtime-api v1.1.5 (*)
│   ├── aws-smithy-types v1.1.5 (*)
│   ├── aws-smithy-xml v0.60.5 (*)
│   ├── aws-types v1.1.5 (*)
│   ├── aws-sdk-dynamodb v1.14.0 (*)

Environment details (OS name and version, etc.)

cargo 1.76.0 (c84b36747 2024-01-18)

Logs

No response

ysaito1001 commented 5 months ago

Hi, thank you for reporting this! To help us reproduce the issue more reliably without guessing, could you also provide the code calling async fn select<T: DeserializeOwned + Send + Sync>(...) {...} (more specifically we'd like to see the definition of a type that's passed to T)?

cobbinma commented 5 months ago

@ysaito1001

of course 👍 (thanks for looking into this)

please let me know if you need anything else to replicate

#[derive(Default, Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RedemptionRecord {
    pub perk_id: String,
    pub user_id: String,
    pub deal_type: String,
    pub savings_percentage: Option<String>,
    pub savings_gbp: Option<String>,
    pub savings_aud: Option<String>,
    pub created_at: String,
    pub name: String,
    pub brand_name: String,
    pub header: String,
    pub estimated_savings_gbp: Option<String>,
    pub estimated_savings_aud: Option<String>,
    pub internal_identifier: Option<String>,
    pub local_currency: Option<String>,
    pub savings_local_currency: Option<String>,
    pub estimated_savings_local_currency: Option<String>,
}
        self.select::<RedemptionRecord>(key, query, SelectFormat::new(SelectType::CsvToJson, "\""))
            .await
ysaito1001 commented 5 months ago

Thank you for providing additional information. Just realized that we also need to know what gets passed to .input_serialization(select_format.input) and .output_serialization(select_format.output) out of select_format. If you could provide that, that'd be appreciated!

cobbinma commented 5 months ago

@ysaito1001 sure 👍

for the file we are using a double quote quote character

        let input_selection = match input {
            SelectType::CsvToJson => InputSerialization::builder()
                .csv(
                    CsvInput::builder()
                        .file_header_info(FileHeaderInfo::Use)
                        .allow_quoted_record_delimiter(true)
                        .quote_character(quote_char.clone())
                        .build(),
                )
                .compression_type(CompressionType::None)
                .build(),
        };

        let output_selection = match input {
            SelectType::CsvToJson => OutputSerialization::builder()
                .json(JsonOutput::builder().build())
                .build(),
        };
ysaito1001 commented 5 months ago

Thank you for providing additional pieces of information. I have reproduced the issue you're observing but suspect that the data stored in output.csv may contain incomplete UTF-8 byte sequence.

I do not receive errors when running in the AWS console so I believe it is related to the rust SDK.

When I ran this query SELECT * FROM S3Object s WHERE createdAt >= '2023-04-24'; within the AWS console (selecting Exclude the first line of CSV data), I retrieved 8651 records:

Running S3 Select on output.csv

When I modified the reproducer so it used String::from_utf8_lossy instead of std::str::from_utf8

// in async fn select_records

....
    if let SelectObjectContentEventStream::Records(records) = event {
        let records = records
            .payload
            .as_ref()
            .map(|p| String::from_utf8_lossy(p.as_ref()))
            .ok_or_else(|| anyhow!("unable to parse payload"))?; // I omitted `ReportingError::Unexpected` because I didn't have it
        for line in records.lines() {
            ...

the reproducer returned 8651 records without deserialization errors.

My guess is that executing S3 select within the AWS management console is more lenient in terms of handling incomplete UTF-8 byte sequence.

cobbinma commented 4 months ago

@ysaito1001 Thanks so much for looking into this 👏

github-actions[bot] commented 4 months ago

Comments on closed issues are hard for our team to see. If you need more assistance, please either tag a team member or open a new issue that references this one. If you wish to keep having a conversation with other community members under this issue feel free to do so.