poundifdef / smoothmq

An improved drop-in replacement for SQS
https://www.smoothmq.com
GNU Affero General Public License v3.0
2.07k stars 35 forks source link

Does not work for aws js client #15

Closed zackees closed 3 months ago

zackees commented 3 months ago

I've forked your repo so things are a little different since i'm using nginx to port forward from 80 to 3001. But otherwise should be mostly compatible:

const AWS = require('aws-sdk');
const dotenv = require('dotenv');
const { promisify } = require('util');
const path = require('path');

AWS.config.logger = console;

// .env file is located in the root of the project
const envFile = path.join(__dirname, '../../.env');

// dotenv.config();
dotenv.config({ path: envFile });

// assert that the AWS_SECRET_ACCESS_KEY is set
if (!process.env.AWS_SECRET_ACCESS_KEY) {
    console.error('AWS_SECRET_ACCESS_KEY is not set');
    process.exit(1);
} else {
    // print out the contents of the env file
    const parsed = dotenv.parse(require('fs').readFileSync(envFile, 'utf-8'));
    console.log("\nContents of .env file:");
    Object.entries(parsed).forEach(([key, value]) => {
        console.log(`${key}=${value}`);
    });
    console.log("");
}

const sleep = promisify(setTimeout);

async function createOrGetQueue(sqs, queueName) {
    try {
        const response = await sqs.listQueues({ QueueNamePrefix: queueName }).promise();
        if (response.QueueUrls && response.QueueUrls.length > 0) {
            const queueUrl = response.QueueUrls[0];
            console.log(`Queue already exists: ${queueUrl}`);
            return [queueUrl, false];
        } else {
            const response = await sqs.createQueue({ QueueName: queueName }).promise();
            const queueUrl = response.QueueUrl;
            console.log(`Created queue: ${queueUrl}`);
            return [queueUrl, true];
        }
    } catch (error) {
        console.error('Error in createOrGetQueue:', error);
        throw error;
    }
}

async function runSqsTest(endpointUrl, awsSecretAccessKey, awsAccessId) {
    const params = {
        apiVersion: "2012-11-05",
        region: 'us-east-1',
        accessKeyId: awsAccessId,
        secretAccessKey: awsSecretAccessKey,
        endpoint: new AWS.Endpoint(endpointUrl),
        signatureVersion: 'v4',
    }
    console.log('Creating SQS client with params:', params)
    const sqs = new AWS.SQS(params);

    const queueName = 'my-test-que-for-testing';
    let queueUrl, queueCreated;

    try {
        [queueUrl, queueCreated] = await createOrGetQueue(sqs, queueName);
        console.log(`Queue URL: ${queueUrl}`);

        await sqs.sendMessage({ QueueUrl: queueUrl, MessageBody: 'hello world' }).promise();
        console.log('Sent a message to the queue');

        const receiveResponse = await sqs.receiveMessage({ QueueUrl: queueUrl, MaxNumberOfMessages: 1 }).promise();
        if (receiveResponse.Messages && receiveResponse.Messages.length > 0) {
            const message = receiveResponse.Messages[0];
            console.log(`Received message: ${message.Body}`);

            await sqs.deleteMessage({ QueueUrl: queueUrl, ReceiptHandle: message.ReceiptHandle }).promise();
            console.log('Deleted the message');
        } else {
            console.log('No messages in the queue');
        }

        await sleep(2000);
    } catch (error) {
        console.error('Error in runSqsTest:', error);
    } finally {
        if (queueUrl) {
            console.log(`Destroying queue: ${queueUrl}`);
            await sqs.deleteQueue({ QueueUrl: queueUrl }).promise();
            console.log(`Destroyed queue: ${queueUrl}`);
        }
    }
}

async function main() {
    const awsSecretAccessKey = process.env.AWS_SECRET_ACCESS_KEY;
    const awsAccessId = process.env.AWS_ACCESS_KEY_ID;
    //const endpoints = ['http://localhost', 'https://jobs.kumquat.live'];
    const endpoints = ['http://localhost'];

    for (const endpointUrl of endpoints) {
        console.log(`\nTesting with endpoint: ${endpointUrl}`);
        await runSqsTest(endpointUrl, awsSecretAccessKey, awsAccessId);
    }
}

main().catch(error => console.error('Error in main:', error));

What's weird is that I can create a queue, but not send a message to it.

zackees commented 3 months ago

I've updated the test. It looks like the SmoothMQ is not able to sign responses correctly. I'm not exactly sure why this is and why the boto3 library for python works as expected.

I've made the js version match as closely to the python version as I can.

Here is the python version of this that works as expected:

import boto3
from botocore.config import Config
import time
import os
import warnings
from dotenv import load_dotenv
from pathlib import Path
import requests

HERE = Path(__file__).parent
PROJECT_ROOT = HERE.parent.parent
ENV_FILE = PROJECT_ROOT / ".env"

load_dotenv(dotenv_path=ENV_FILE)

def create_or_get_queue(sqs, queue_name: str) -> tuple[str, bool]:
    response = sqs.list_queues(QueueNamePrefix=queue_name)
    if 'QueueUrls' in response and len(response['QueueUrls']) > 0:
        queue_url = response['QueueUrls'][0]
        print(f"Queue already exists: {queue_url}")
        return queue_url, False
    else:
        response = sqs.create_queue(QueueName=queue_name)
        queue_url = response['QueueUrl']
        print(f"Created queue: {queue_url}")
        return queue_url, True

def run_sqs_test(endpoint_url: str, aws_secret_acess_key: str, aws_access_id: str) -> None:
    # Load environment variables from .env file
    params = {
        "region_name": "us-east-1",
        "aws_access_key_id": aws_access_id,
        "aws_secret_access_key": aws_secret_acess_key,
        "endpoint_url": endpoint_url,
        "api_version": "2012-11-05"
    }

    print(f"Testing with parameters: {params}")

    # Configure the SQS client with boto3 v4 signature
    config = Config(signature_version='v4')
    sqs = boto3.client("sqs", config=config, **params)

    # Create or get the queue
    queue_name = "my-test-que-for-testing"
    queue_url, queue_created = create_or_get_queue(sqs, queue_name)

    print(f"Queue URL: {queue_url}")

    try:
        # Perform operations
        sqs.send_message(QueueUrl=queue_url, MessageBody="hello world")
        print("Sent a message to the queue")

        # Receive and print the message
        response = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=1)
        if 'Messages' in response:
            message = response['Messages'][0]
            print(f"Received message: {message['Body']}")

            # Delete the message
            sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=message['ReceiptHandle'])
            print("Deleted the message")
        else:
            print("No messages in the queue")

        # Wait a moment to ensure all operations are completed
        time.sleep(2)

    finally:
        print(f"Destroying queue: {queue_url}")
        sqs.delete_queue(QueueUrl=queue_url)
        print(f"Destroyed queue: {queue_url}")

def check_server_is_alive(endpoint_url: str) -> bool:
    try:
        response = requests.get(endpoint_url + "/healthz", timeout=2)
        response.raise_for_status()
        return True
    except requests.exceptions.RequestException as e:
        print(f"\nError: {e}")
        print(f"\nCould not reach endpoint: {endpoint_url}")
    return False

def main() -> None:
    aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY")
    aws_access_id=os.getenv("AWS_ACCESS_KEY_ID")
    endpoints = ["http://localhost", "https://jobs.kumquat.live"]
    for i, endpoint_url in enumerate(endpoints):
        is_alive = check_server_is_alive(endpoint_url)
        if not is_alive:
            warnings.warn(f"Endpoint is not alive: {endpoint_url}")
            if i == 0:
                # fatal error for localhost
                raise Exception("Localhost is not alive")
        print(f"\nTesting with endpoint: {endpoint_url}")
        run_sqs_test(endpoint_url=endpoint_url, aws_secret_acess_key=aws_secret_access_key, aws_access_id=aws_access_id)

if __name__ == "__main__":
    main()
zackees commented 3 months ago

Okay, so it turns out that there is a problem with the aws-sdk client. Using the newer client resolves the issue. Here is the updated code base that correctly resolves this:

const { SQSClient, ListQueuesCommand, CreateQueueCommand, SendMessageCommand, ReceiveMessageCommand, DeleteMessageCommand, DeleteQueueCommand } = require('@aws-sdk/client-sqs');
const dotenv = require('dotenv');
const { promisify } = require('util');
const path = require('path');

// .env file is located in the root of the project
const envFile = path.join(__dirname, '../../.env');

dotenv.config({ path: envFile });

// assert that the AWS_SECRET_ACCESS_KEY is set
if (!process.env.AWS_SECRET_ACCESS_KEY) {
    console.error('AWS_SECRET_ACCESS_KEY is not set');
    process.exit(1);
} else {
    // print out the contents of the env file
    const parsed = dotenv.parse(require('fs').readFileSync(envFile, 'utf-8'));
    console.log("\nContents of .env file:");
    Object.entries(parsed).forEach(([key, value]) => {
        console.log(`${key}=${value}`);
    });
    console.log("");
}

const sleep = promisify(setTimeout);

async function createOrGetQueue(sqs, queueName) {
    try {
        const listResponse = await sqs.send(new ListQueuesCommand({ QueueNamePrefix: queueName }));
        if (listResponse.QueueUrls && listResponse.QueueUrls.length > 0) {
            const queueUrl = listResponse.QueueUrls[0];
            console.log(`Queue already exists: ${queueUrl}`);
            return [queueUrl, false];
        } else {
            const createResponse = await sqs.send(new CreateQueueCommand({ QueueName: queueName }));
            const queueUrl = createResponse.QueueUrl;
            console.log(`Created queue: ${queueUrl}`);
            return [queueUrl, true];
        }
    } catch (error) {
        console.error('Error in createOrGetQueue:', error);
        throw error;
    }
}

async function runSqsTest(endpointUrl, awsSecretAccessKey, awsAccessId) {
    const clientConfig = {
        region: 'us-east-1',
        credentials: {
            accessKeyId: awsAccessId,
            secretAccessKey: awsSecretAccessKey,
        },
        endpoint: endpointUrl,
    };
    console.log('Creating SQS client with config:', clientConfig);
    const sqs = new SQSClient(clientConfig);

    const queueName = 'my-test-que-for-testing';
    let queueUrl, queueCreated;

    try {
        [queueUrl, queueCreated] = await createOrGetQueue(sqs, queueName);
        console.log(`Queue URL: ${queueUrl}`);

        await sqs.send(new SendMessageCommand({ QueueUrl: queueUrl, MessageBody: 'hello world' }));
        console.log('Sent a message to the queue');

        const receiveResponse = await sqs.send(new ReceiveMessageCommand({ QueueUrl: queueUrl, MaxNumberOfMessages: 1 }));
        if (receiveResponse.Messages && receiveResponse.Messages.length > 0) {
            const message = receiveResponse.Messages[0];
            console.log(`Received message: ${message.Body}`);

            await sqs.send(new DeleteMessageCommand({ QueueUrl: queueUrl, ReceiptHandle: message.ReceiptHandle }));
            console.log('Deleted the message');
        } else {
            console.log('No messages in the queue');
        }

        await sleep(2000);
    } catch (error) {
        console.error('Error in runSqsTest:', error);
    } finally {
        if (queueUrl) {
            console.log(`Destroying queue: ${queueUrl}`);
            await sqs.send(new DeleteQueueCommand({ QueueUrl: queueUrl }));
            console.log(`Destroyed queue: ${queueUrl}`);
        }
    }
}

async function main() {
    const awsSecretAccessKey = process.env.AWS_SECRET_ACCESS_KEY;
    const awsAccessId = process.env.AWS_ACCESS_KEY_ID;
    const endpoints = ['http://localhost'];

    for (const endpointUrl of endpoints) {
        console.log(`\nTesting with endpoint: ${endpointUrl}`);
        await runSqsTest(endpointUrl, awsSecretAccessKey, awsAccessId);
    }
}

main().catch(error => console.error('Error in main:', error));
poundifdef commented 3 months ago

Thank you for chasing it down. I’ll mark this as resolved for now and an making a note to start setting up documentation with code examples for getting started.