harlow / kinesis-consumer

Golang library for consuming Kinesis stream data
MIT License
263 stars 88 forks source link

Error when saving to DynamoDB #142

Closed tiagomelo closed 2 years ago

tiagomelo commented 2 years ago

Hi,

I'm using DynamoDB as storage. After inspecting the source code and putting some debug printing, I've found out this error on func (c *Checkpoint) save() error from ddb.go:

 operation error DynamoDB: PutItem, https response error StatusCode: 400, RequestID: NJ7CNNF0S4DLOPBJLL530Q7NEBVV4KQNSO5AEMVJF66Q9ASUAAJG, api error ValidationException: One or more parameter values were invalid: Missing the key namespace in the item

the item is correctly fulfilled (I've printed out namespace, ShardID and SequenceNumber, all of them are valid):

item, err := attributevalue.MarshalMap(item{
            Namespace:      fmt.Sprintf("%s-%s", c.appName, key.streamName),
            ShardID:        key.shardID,
            SequenceNumber: sequenceNumber,
        })

My table has everything it needs as instructed:

Partition key: namespace
Sort key: shard_id

Any clue? Thanks.

tiagomelo commented 2 years ago

Hi, I've figured out what is happening.

At item struct (https://github.com/harlow/kinesis-consumer/blob/master/store/ddb/ddb.go#L88):

type item struct {
    Namespace      string `json:"namespace"`
    ShardID        string `json:"shard_id"`
    SequenceNumber string `json:"sequence_number"`
}

although you're using json struct tag to correctly map to the field names at the table, if you do not inform TagKey option with json (https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue#hdr-Struct_tags), the item struct will be marshalled ignoring json tags, ending up with this:

item: map[Namespace:0xc000132288 SequenceNumber:0xc0001322e8 ShardID:0xc0001322a0]

which is, capitalized names that does not reflect the fields at the table.

One quick fix would be to change item struct to this:

type item struct {
    Namespace      string `dynamodbav:"namespace"`
    ShardID        string `dynamodbav:"shard_id"`
    SequenceNumber string `dynamodbav:"sequence_number"`
}

because dynamodbav struct tag is automatically supported.

OR (preferred)

inform TagKey option with json (https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue#hdr-Struct_tags).

tiagomelo commented 2 years ago

and that leaves to a problem: https://github.com/harlow/kinesis-consumer/blob/master/store/ddb/ddb.go#L104-L108

Key: map[string]types.AttributeValue{
            "namespace": &types.AttributeValueMemberS{
                Value: namespace,
            },
            "shard_id": &types.AttributeValueMemberS{
                Value: shardID,
            },
        },
harlow commented 2 years ago

@tiagomelo thanks for submitting this issue; I was able to recreate the error and fixed as you suggested using the dynamodbav annotation for the checkpoint item