Open gustawdaniel opened 1 year ago
I have Cannot find module 'bluebird'
details:
node:internal/modules/cjs/loader:1039
const err = new Error(message);
^
Error: Cannot find module 'bluebird'
Require stack:
- /home/daniel/exp/rabbit-mq-haredo/node_modules/haredo/dist/consumer.js
- /home/daniel/exp/rabbit-mq-haredo/node_modules/haredo/dist/haredo.js
- /home/daniel/exp/rabbit-mq-haredo/node_modules/haredo/dist/index.js
- /home/daniel/exp/rabbit-mq-haredo/src/storeage/rabbit.ts
- /home/daniel/exp/rabbit-mq-haredo/src/consumer.ts
at Module._resolveFilename (node:internal/modules/cjs/loader:1039:15)
at u.default._resolveFilename (/home/daniel/exp/rabbit-mq-haredo/node_modules/@esbuild-kit/cjs-loader/dist/index.js:1:1519)
at Module._load (node:internal/modules/cjs/loader:885:27)
at Module.require (node:internal/modules/cjs/loader:1105:19)
at require (node:internal/modules/cjs/helpers:103:18)
at _b (/home/daniel/exp/rabbit-mq-haredo/node_modules/haredo/src/consumer.ts:6:1)
at Object.<anonymous> (/home/daniel/exp/rabbit-mq-haredo/node_modules/haredo/src/consumer.ts:184:29)
at Module._compile (node:internal/modules/cjs/loader:1218:14)
at Object.F (/home/daniel/exp/rabbit-mq-haredo/node_modules/@esbuild-kit/cjs-loader/dist/index.js:1:941)
at Module.load (node:internal/modules/cjs/loader:1081:32) {
code: 'MODULE_NOT_FOUND',
requireStack: [
'/home/daniel/exp/rabbit-mq-haredo/node_modules/haredo/dist/consumer.js',
'/home/daniel/exp/rabbit-mq-haredo/node_modules/haredo/dist/haredo.js',
'/home/daniel/exp/rabbit-mq-haredo/node_modules/haredo/dist/index.js',
'/home/daniel/exp/rabbit-mq-haredo/src/storeage/rabbit.ts',
'/home/daniel/exp/rabbit-mq-haredo/src/consumer.ts'
]
}
Node.js v18.13.0
But i don't want bluebird
because in Node js there are native promises.
I installed bluebird
manually and now see error .consume is not a function
/home/daniel/exp/rabbit-mq-haredo/src/consumer.ts:5
.consume(async (message) => {
^
TypeError: import_rabbit.rabbit.queue(...).bindExchange(...).consume is not a function
at <anonymous> (/home/daniel/exp/rabbit-mq-haredo/src/consumer.ts:5:6)
at Object.<anonymous> (/home/daniel/exp/rabbit-mq-haredo/src/consumer.ts:7:6)
at Module._compile (node:internal/modules/cjs/loader:1218:14)
at Object.F (/home/daniel/exp/rabbit-mq-haredo/node_modules/@esbuild-kit/cjs-loader/dist/index.js:1:941)
at Module.load (node:internal/modules/cjs/loader:1081:32)
at Module._load (node:internal/modules/cjs/loader:922:12)
at ModuleWrap.<anonymous> (node:internal/modules/esm/translators:169:29)
at ModuleJob.run (node:internal/modules/esm/module_job:194:25)
Node.js v18.13.0
I am only executing command
npx tsx watch src/consumer.ts
on file that I created reading README
import { haredo } from 'haredo';
const rabbit = haredo({
connection: 'amqp://localhost:5672/'
});
rabbit.queue('my-queue')
.bindExchange('testExchange', '#', 'topic', { durable: false }) // Can be omitted if you don't want to bind the queue to an exchange right now
.consume(async (message) => {
console.log(message);
});
but QueueChain
does not have consume
method. There is QueueChain
interface
export interface QueueChain<TMessage, TReply> extends GeneralChainMembers<(state: HaredoChainState<TMessage, TReply>) => QueueChain<TMessage, TReply>>, QueuePublishMethod<TMessage, TReply> {
/**
* Return the state of the chain for inspection
*/
getState(): Partial<HaredoChainState<TMessage>>;
/**
* Bind an exchange to the queue.
*
* For patterns there are two wildcards:
* * `*` - one word
* * `#` - zero or more words
* A word is dot(period) delimited
*
* @param exchange Exchange to bind
* @param pattern Pattern(s) to use
*/
bindExchange<TCustomMessage = unknown, TCustomReply = unknown>(exchange: Exchange<TCustomMessage>, pattern: string | string[]): QueueChain<MergeTypes<TMessage, TCustomMessage>, MergeTypes<TReply, TCustomReply>>;
/**
* Bind an exchange to the queue.
*
* For patterns there are two wildcards:
* * `*` - one word
* * `#` - zero or more words
* A word is dot(period) delimited
*
* @param exchangeName name of the exchange
* @param pattern pattern(s) to bind
* @param exchangeType Type of the exchange
* @param exchangeOpts Options to use for asserting the exchange
*/
bindExchange(exchangeName: string, pattern: string | string[], exchangeType: ExchangeType, exchangeOpts?: ExchangeOptions): QueueChain<TMessage, TReply>;
/**
* Bind an exchange to the queue.
*
* For patterns there are two wildcards:
* * `*` - one word
* * `#` - zero or more words
* A word is dot(period) delimited
*
* @param exchange Name of the exchange to bind
* @param pattern Pattern(s) to use
* @param type Type of the exchange
* @param opts Options to pass to amqplib for asserting
*/
bindExchange<TCustomMessage = unknown, TCustomReply = unknown>(exchange: string, pattern: string | string[], type: ExchangeType, opts?: Partial<ExchangeOptions>): QueueChain<MergeTypes<TMessage, TCustomMessage>, MergeTypes<TReply, TCustomReply>>;
/**
* Enable noAck on the consumer. When noAck is set the broker will dequeue
* messages when they are sent down the wire.
*
* @param noAck defaults to true
*/
noAck(noAck?: boolean): QueueChain<TMessage, TReply>;
/**
* Set the consumer priority. Lower priority consumers will receive messages only when higher
* priority consumers are busy.
*
* Requires [Consumer priorities](https://www.rabbitmq.com/consumer-priority.html) extension to be enabled
*/
priority(priority: number): QueueChain<TMessage, TReply>;
/**
* When exclusive is set to true, the broker won't allow anyone else consume from this queue
* @param exclusive defaults to true
*/
exclusive(exclusive?: boolean): QueueChain<TMessage, TReply>;
/**
* Autoack (enabled by default) automatically acks/nacks messages when
* subscriber callback throws an error or the promise returned from it
* gets rejected.
*
* @param autoAck defaults to true
*/
autoAck(autoAck?: boolean): QueueChain<TMessage, TReply>;
/**
* Set prefetch count for consuming (ie. amount of messages that will be received in parallel)
* 0 Means there is no limit.
*
* Aliased to .concurrency
*
* @param prefetch number of messages to prefetch
*/
prefetch(prefetch: number): QueueChain<TMessage, TReply>;
/**
* Set prefetch count for consuming (ie. amount of messages that will be received in parallel)
* 0 Means there is no limit.
*
* Aliased to .prefetch
*
* @param concurrency number of concurrent messages passed to .subscribe callbacks
*/
concurrency(concurrency: number): QueueChain<TMessage, TReply>;
/**
* Reestablish a subscriber when channel / connection closes (enabled by default)
*
* @param reestablish defaults to true
*/
reestablish(reestablish?: boolean): QueueChain<TMessage, TReply>;
/**
* Subscribe to messages in the queue specified in the chain
*/
subscribe<TCustomMessage, TCustomReply>(cb: MessageCallback<MergeTypes<TMessage, TCustomMessage>, MergeTypes<TReply, TCustomReply>>): Promise<Consumer>;
/**
* Autoreply (enabled by default) automatically replies to messages
* where message callback in subscriber returns a non-undefined value
* (Only if message has replyTo and a correlationId)
*
* [RPC tutorial](https://www.rabbitmq.com/tutorials/tutorial-six-javascript.html)
*
* @param autoReply defaults to true
*/
autoReply(autoReply?: boolean): QueueChain<TMessage, TReply>;
/**
* Provide a failurebackoff to control the rate of messages in case of errors.
* Bundled together with haredo comes standardBackoff
*/
backoff(backoff: FailureBackoff): QueueChain<TMessage, TReply>;
/**
* Don't run automatic setup. Useful for faster publishing.
*
* @param skipSetup defaults to true
*/
skipSetup(skipSetup?: boolean): QueueChain<TMessage, TReply>;
/**
* Add middleware to the subscription
*/
use(middleware: Middleware<TMessage, TReply> | Middleware<TMessage, TReply>[]): QueueChain<TMessage, TReply>;
}
Seems like I only used delay from bluebird, which came with amqplib, I'll refactor it out quickly, it's only 4 ocurrances.
I assume the .consume is from old version of haredo, I can't really remember, it's been a while since I released v2. In current version it's .subscribe.
The durable thing might be because amqplib doesn't have typings, try adding @types/amqplib, see if that fixes it. I've tripped on this recently as well but I'm unwilling to copy the typings over, since I'm refactoring amqplib out and replacing it with https://www.npmjs.com/package/@cloudamqp/amqp-client
Released version 2.11.1 with the bluebird removal. Turns out that I did have a delay function but I auto-imported it from bluebird mistakenly in one file. Can you confirm that adding @types/amqplib fixes the typing issue?
I installed "@types/amqplib": "^0.10.1",
and now consumer is fixed
rabbit.queue('my-queue')
.bindExchange('my-exchange', '#', 'topic', { durable: false }) // Can be omitted if you don't want to bind the queue to an exchange right now
.subscribe(async (message) => {
console.log(message);
});
but producer is broken. In code
rabbit.exchange('my-exchange').publish({ id: 5, status: 'active' }, 'item.created');
i see
TS2345: Argument of type 'string' is not assignable to parameter of type 'Exchange '.
on 'my-exchange'
.
I tested that this
rabbit.queue('my-queue').publish({ id: 5, status: 'inactive' });
works correctly, but also there is problem that if rabbitmq-server
is stopped then I can run code using haredo
without any errors. I would rather prefer to see that connection failed.
I copied code from README and have problem with typescript