DynamoMQ is a message queuing library that leverages DynamoDB as storage, implemented in Go. It provides an SDK to support the implementation of consumers and producers in Go, along with a CLI that functions as a management tool.
Message Queuing systems are widely adopted for asynchronous message transmission between applications. These systems emphasize high throughput and reliability. In this section, we explore the benefits of integrating the features of DynamoDB with MQ systems.
Amazon DynamoDB is a high-performance NoSQL database service that supports a wide range of data models, from simple key-value stores to complex document-based stores. The main features of DynamoDB include:
Reference: What is Amazon DynamoDB?
Applying the features of DynamoDB to MQ systems offers the following benefits:
AWS offers several solutions for MQ, including Amazon SQS, Amazon MQ, and Amazon MSK based on Kafka. By comparing these services with DynamoMQ, we can examine the unique advantages of DynamoMQ.
Amazon SQS is a fully managed, serverless MQ service widely used for asynchronous message processing in large-scale distributed applications.
It excels in scalability, cost efficiency, reliability, and availability, and I generally recommend using Amazon SQS.
However, Amazon SQS lacks flexibility in certain aspects. Messages sent to the queue cannot be reordered or have their attributes updated from outside. Also, to reference messages within the queue, they must first be received. These constraints can make it challenging to handle specific problems and hinder investigation and recovery efforts.
Amazon MQ is a message broker service based on Apache ActiveMQ and RabbitMQ, primarily aimed at facilitating the migration of existing applications. It supports a variety of protocols such as JMS, AMQP, STOMP, MQTT, OpenWire, and WebSocket.
Amazon MQ offers numerous features provided by its base technologies, ActiveMQ and RabbitMQ. However, it is not serverless, which means setup and management require more effort.
Amazon MSK provides a fully managed Apache Kafka service, suitable for processing large volumes of streaming data.
While MSK specializes in real-time data streaming, DynamoMQ focuses on general message queuing needs. MSK allows for advanced configurations, but this can lead to increased complexity and costs.
DynamoMQ leverages the scalability, durability, and serverless nature of DynamoDB. Compared to some AWS queuing solutions, it offers flexibility in managing the order and attributes of messages, low-cost operations, and easy setup and management.
DynamoMQ is equipped with key features that should be provided by a Message Queuing system, supporting the realization of a flexible and reliable system.
If a message is not processed successfully, it will be redelivered. This approach addresses temporary errors and delays in processing, increasing the chances that messages are processed correctly.
Multiple goroutines are utilized to process messages concurrently. This feature enables high throughput and efficient use of resources.
Messages that exceed the maximum number of redeliveries are moved to the Dead Letter Queue (DLQ). This separates messages with persistent errors, allowing for later analysis or manual processing.
Message processing is completed before the shutdown of the consumer process. This prevents the loss of messages that are being processed at the time of shutdown.
Messages are retrieved from the queue on a First In, First Out basis. This guarantees that messages are processed in the order they were sent, making it suitable for applications where order is important.
Multiple consumer processes can be launched, enabling scaling out. This prevents duplication in retrieving messages from the same message queue. Consequently, processing capacity can be dynamically adjusted according to the load.
A visibility timeout is set for a specific period during which the message is invisible to all consumers. This prevents a message from being received by other consumers while it is being processed.
Delay queuing allows the delivery of new messages to consumers to be delayed for a set number of seconds. This feature accommodates the needs of applications that require a delayed message delivery.
Requires Go version 1.21 or greater.
This package can be installed as CLI with the go install command:
$ go install github.com/vvatanabe/dynamomq/cmd/dynamomq@latest
This package can be installed as library with the go get command:
$ go get -u github.com/vvatanabe/dynamomq@latest
Please refer to dynamomq-iam-policy.json or dynamomq-iam-policy.tf
aws dynamodb create-table --cli-input-json file://dynamomq-table.json
Please refer to dynamomq-table.json.
Please refer to dynamomq-table.tf.
DynamoMQ's CLI and library configure AWS Config with credentials obtained from external configuration sources. This setup allows for flexible and secure management of access credentials. The following are the default sources for configuration:
AWS_REGION
- Specifies the AWS region.AWS_PROFILE
- Identifies the AWS profile to be used.AWS_ACCESS_KEY_ID
- Your AWS access key.AWS_SECRET_ACCESS_KEY
- Your AWS secret key.AWS_SESSION_TOKEN
- Session token for temporary credentials.The dynamomq
command-line interface provides a range of commands to interact with your DynamoDB-based message queue. Below are the available commands and global flags that can be used with dynamomq
.
completion
: Generate the autocompletion script for the specified shell to ease command usage.delete
: Delete a message from the queue using its ID.dlq
: Retrieve the statistics for the Dead Letter Queue (DLQ), providing insights into failed message processing.enqueue-test
: Send test messages to the DynamoDB table with IDs A-101, A-202, A-303, and A-404; existing messages with these IDs will be overwritten.fail
: Simulate the failure of message processing, which will return the message to the queue for reprocessing.get
: Fetch a specific message from the DynamoDB table using the application domain ID.help
: Display help information about any command.invalid
: Move a message from the standard queue to the DLQ for manual review and correction.ls
: List all message IDs in the queue, limited to a maximum of 10 elements.purge
: Remove all messages from the DynamoMQ table, effectively clearing the queue.qstat
: Retrieve statistics for the queue, offering an overview of its current state.receive
: Receive a message from the queue; this operation will replace the current message ID with the retrieved one.redrive
: Move a message from the DLQ back to the standard queue for reprocessing.reset
: Reset the system information of a message, typically used in message recovery scenarios.--endpoint-url
: Override the default URL for commands with a specified endpoint URL.-h
, --help
: Display help information for dynamomq
.--queueing-index-name
: Specify the name of the queueing index to use (default is "dynamo-mq-index-queue_type-sent_at"
).--table-name
: Define the name of the DynamoDB table to contain the items (default is "dynamo-mq-table"
).To get more detailed information about a specific command, use dynamomq [command] --help
.
The DynamoMQ CLI supports an Interactive Mode for an enhanced user experience. To enter the Interactive Mode, simply run the dynamomq
command without specifying any subcommands.
Once in Interactive Mode, you will have access to a suite of commands to manage and inspect your message queue:
qstat
or qstats
: Retrieves the queue statistics.dlq
: Retrieves the Dead Letter Queue (DLQ) statistics.enqueue-test
or et
: Sends test messages to the DynamoDB table with IDs: A-101, A-202, A-303, and A-404; if a message with the same ID already exists, it will be overwritten.purge
: Removes all messages from the DynamoMQ table.ls
: Lists all message IDs, displaying a maximum of 10 elements.receive
: Receives a message from the queue and replaces the current ID with the peeked one.id <id>
: Switches the Interactive Mode to app mode, allowing you to perform various operations on a message identified by the provided app domain ID:
sys
: Displays the system info data in a JSON format.data
: Prints the data as JSON for the current message record.info
: Prints all information regarding the Message record, including system_info and data in JSON format.reset
: Resets the system info of the message.redrive
: Drives a message from the DLQ back to the STANDARD queue.delete
: Deletes a message by its ID.fail
: Simulates the failed processing of a message by putting it back into the queue; the message will need to be received again.invalid
: Moves a message from the standard queue to the DLQ for manual fixing.To begin using DynamoMQ, first import the necessary packages from the AWS SDK for Go v2 and the DynamoMQ library. These imports are required to interact with AWS services and to utilize the DynamoMQ functionalities.
import (
"github.com/aws/aws-sdk-go-v2/config"
"github.com/vvatanabe/dynamomq"
)
The following code block initializes the DynamoMQ client. It loads the AWS configuration and creates a new DynamoMQ client with that configuration. Replace 'ExampleData' with your own data structure as needed.
ctx := context.Background()
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
panic("failed to load aws config")
}
client, err := dynamomq.NewFromConfig[ExampleData](cfg)
if err != nil {
panic("AWS session could not be established!")
}
Define the data structure that will be used with DynamoMQ. Here, 'ExampleData' is a struct that will be used to represent the data in the DynamoDB.
type ExampleData struct {
Data1 string `dynamodbav:"data_1"`
Data2 string `dynamodbav:"data_2"`
Data3 string `dynamodbav:"data_3"`
}
The following snippet creates a DynamoMQ producer for the 'ExampleData' type. It then sends a message with predefined data to the queue.
producer := dynamomq.NewProducer[ExampleData](client)
_, err = producer.Produce(ctx, &dynamomq.ProduceInput[ExampleData]{
Data: ExampleData{
Data1: "foo",
Data2: "bar",
Data3: "baz",
},
})
if err != nil {
panic("failed to produce message")
}
To consume messages, instantiate a DynamoMQ consumer for 'ExampleData' and start it in a new goroutine. The consumer will process messages until an interrupt signal is received. The example includes graceful shutdown logic for the consumer.
consumer := dynamomq.NewConsumer[ExampleData](client, &Counter[ExampleData]{})
go func() {
err = consumer.StartConsuming()
if err != nil {
fmt.Println(err)
}
}()
done := make(chan os.Signal, 1)
signal.Notify(done, os.Interrupt, syscall.SIGTERM)
<-done
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
if err := consumer.Shutdown(ctx); err != nil {
fmt.Println("failed to consumer shutdown:", err)
}
Here we define a 'Counter' type that implements the processing logic for consumed messages. Each time a message is processed, the counter is incremented, and the message details are printed.
type Counter[T any] struct {
Value int
}
func (c *Counter[T]) Process(msg *dynamomq.Message[T]) error {
c.Value++
fmt.Printf("value: %d, message: %v\n", c.Value, msg)
return nil
}
Here's a diagram showing the table definition used by DynamoMQ to implement its message queuing mechanism.
Key | Attributes | Type | Example Value |
---|---|---|---|
PK | id | string | A-101 |
data | any | any | |
receive_count | number | 1 | |
GSIPK | queue_type | string | STANDARD or DLQ |
version | number | 1 | |
created_at | string | 2006-01-02T15:04:05.999999999Z07:00 | |
updated_at | string | 2006-01-02T15:04:05.999999999Z07:00 | |
GSISK | sent_at | string | 2006-01-02T15:04:05.999999999Z07:00 |
received_at | string | 2006-01-02T15:04:05.999999999Z07:00 | |
invisible_until_at | string | 2006-01-02T15:04:05.999999999Z07:00 |
This is the unique identifier of the message, serving as the partition key in DynamoDB. It ensures efficient data distribution and access within DynamoDB. When using DynamoMQ's publisher, a UUID is generated by default. Users can also specify their own IDs.
This field contains the data included in the message. It can be stored in any format supported by DynamoDB.
References:
This indicates the number of times the message has been received. It's used for managing retries and moving messages to the Dead Letter Queue.
This attribute shows the type of queue where the message is stored, distinguishing between STANDARD and DLQ.
This is the version number of the message. It increments each time the message is updated, facilitating optimistic concurrency control.
The timestamp when the message was created, recorded in ISO 8601 format.
The timestamp when the message was last updated, recorded in ISO 8601 format.
The timestamp when the message was sent to the queue, recorded in ISO 8601 format.
The timestamp when the message was received, recorded in ISO 8601 format.
The timestamp indicating when the message will next become visible in the queue. Once this time passes, the message becomes receivable again.
A GSI with queue_type
as the partition key and sent_at
as the sort key is set up to receive messages in the order they are added to the queue.
The following state machine diagram illustrates the lifecycle of messages in DynamoMQ and their possible state transitions. The diagram shows how messages are processed in both STANDARD (queue_type=STANDARD
) and Dead Letter Queues (queue_type=DLQ
).
Additionally, the below diagram illustrates how message attributes change with state transitions. Attributes highlighted in red are updated during these transitions.
Message Sending
SendMessage()
function to send a message to the standard queue.data
field.queue_type
attribute is set to STANDARD
, and the version starts at 1
.created_at
, updated_at
, and sent_at
are recorded.Message Receipt
ReceiveMessage()
function and begins processing.receive_count
and version
of the message increment with each receipt.
updated_at
and received_at
are updated, and invisible_until_at
is set with a new timestamp.Successful Processing
DeleteMessage()
function.Failed Processing
ChangeMessageVisibility()
is used to update the invisible_until_at
attribute of the message for later reprocessing.invisible_until_at
passes, the message returns to the 'READY' state for potential re-receipt.Move to DLQ
MoveMessageToDLQ()
function moves it to the Dead Letter Queue (DLQ).queue_type
of the message changes to DLQ
, and receive_count
resets to 0
.Receipt of Message
ReceiveMessage()
and transitions to the 'PROCESSING' state.Successful Processing
DeleteMessage()
.Return to Standard Queue
RedriveMessage()
, the message can be moved back to the original standard queue.queue_type
to `STANDARD’, and the message reverts to the 'READY' state.DynamoMQ is a message queuing library that leverages the features of DynamoDB to achieve high scalability, reliability, and cost efficiency. Notably, its ability to dynamically edit message order and attributes enables flexible adaptation to application requirements.
Compared to existing solutions, DynamoMQ offers ease of management for developers while providing the reliability of fully managed services like Amazon SQS. It also encompasses key functionalities expected from a message queue, such as concurrent processing with multiple goroutines, Dead Letter Queues, and ensuring FIFO (First In, First Out) order.
We extend our deepest gratitude to the AWS official blog, "Implementing priority queueing with Amazon DynamoDB," which served as a reference in the development of DynamoMQ. This blog provides a detailed explanation of implementing priority queuing using Amazon DynamoDB, and this knowledge has been immensely beneficial in constructing DynamoMQ.
This project is licensed under the MIT License. For detailed licensing information, refer to the LICENSE file included in the repository.