Open alfredoperez opened 5 years ago
I can work on the fix for this, but I want to get your opinion first.
I'd love to see the draft PR. Would you like to create one with your idea?
Any updates on this? Without this feature kafka is pretty much useless for us.
I also need this feature. Some microservices need to handle different kafka topics in the same way. We have to write a separate listener for each topic.
Hello! Any updates? We also stuck with this.
I'm just a random with some experience in creating own microservice transport (actually at work we replaced nearly whole nest/microservices to match our needs), but i can try to implement it in free time. @alfredoperez if i understand it correctly, this kind of pattern matching would require an array of regexes checking if given event has a regex matching it pattern. However, unlike for string matching, which is exact, more than one regex can match one string. What should happen (/what is currently happening in kafka in implementation you mentioned) when it is happening?
For example in case when the event is "someString", and there are 2 handlers: someHandler matching /some./ and string handler matching for /.string/i. Should the first (which is first?) handler be executed, or both of them?
Sorry for mentioning, but @kamilmysliwiec how would you handle this situation? Is there any place in code when there is similiar (regex based) patter matching?
Hi, any update on this?
We have a similar requirement, either the MessagePattern
decorator takes in an array of strings or a regex could also work.
If anyone needs, then there is a workaround for regexp support. You just need to add custom strategy and override bindEvents
method as shown below.
import { CustomTransportStrategy, ServerKafka } from '@nestjs/microservices';
import { Consumer } from 'kafkajs';
export class KafkaCustomTransport
extends ServerKafka
implements CustomTransportStrategy
{
override async bindEvents(consumer: Consumer): Promise<void> {
const registeredPatterns = [...this.messageHandlers.entries()].map(
([pattern, handler]) =>
pattern.startsWith('/') && pattern.endsWith('/')
? new RegExp(
pattern.slice(1, pattern.length - 2),
handler.extras.flags,
)
: pattern,
);
const consumerSubscribeOptions = this.options.subscribe || {};
const subscribeToPattern = async (pattern: string) =>
consumer.subscribe({
topic: pattern,
...consumerSubscribeOptions,
});
await Promise.all(registeredPatterns.map(subscribeToPattern));
const consumerRunOptions = Object.assign(this.options.run || {}, {
eachMessage: this.getMessageHandler(),
});
await consumer.run(consumerRunOptions);
}
}
And use regexp like this
@EventPattern('/my-super-topic.retry.[0-9]+$/', { flags: 'i' })
async mySuperTopicRetry() {}
@fjodor-rybakov Was there anything else you had to change? I copy / pasted your class and added it as the strategy for my microservice to just try it and it doesn't call the regular expression event pattern. I get the following Error instead.
@EventPattern('/myevent-.*/', { flags: 'i' })
async onSomeEvent(data: any) {
// this never gets called
console.log('->', data);
}
// ... elsewhere in the code I call the emit
this.client.emit('myevent-1', obj);
ERROR [ServerKafka] There is no matching event handler defined in the remote service. Event pattern: myevent-1
If I hard-code the event pattern it works...
@EventPattern('myevent-1')
async onSomeEvent(data: any) {
// this never gets called
console.log('->', data);
}
@fjodor-rybakov Was there anything else you had to change? I copy / pasted your class and added it as the strategy for my microservice to just try it and it doesn't call the regular expression event pattern. I get the following Error instead.
@EventPattern('/myevent-.*/', { flags: 'i' }) async onSomeEvent(data: any) { // this never gets called console.log('->', data); } // ... elsewhere in the code I call the emit this.client.emit('myevent-1', obj);
ERROR [ServerKafka] There is no matching event handler defined in the remote service. Event pattern: myevent-1
If I hard-code the event pattern it works...
@EventPattern('myevent-1') async onSomeEvent(data: any) { // this never gets called console.log('->', data); }
Sry, i forgot...
You also need to override the getHandlerByPattern
method, that it looks for a handler with a suitable regex for the consumer topic
So thanks for the reply @fjodor-rybakov I got it working somewhat now but there are definite issues still. For example, if you have multiple EventPatterns using Regular Expressions that overlap on the event that should fire (ie. /myevent-[0-9]+/
and /myevent-.*/
) only the first one found will execute or if there is an exact string event name only that then would fire.
Here is the updated class that I was testing with...
export class KafkaCustomTransport
extends ServerKafka
implements CustomTransportStrategy
{
override async bindEvents(consumer: Consumer): Promise<void> {
const registeredPatterns = [...this.messageHandlers.entries()].map(
([pattern, handler]) =>
pattern.startsWith('/') && pattern.endsWith('/')
? new RegExp(
pattern.slice(1, pattern.length - 1),
handler.extras.flags,
)
: pattern,
);
const consumerSubscribeOptions = this.options.subscribe || {};
const subscribeToPattern = async (pattern: string) =>
consumer.subscribe({
topic: pattern,
...consumerSubscribeOptions,
});
await Promise.all(registeredPatterns.map(subscribeToPattern));
const consumerRunOptions = Object.assign(this.options.run || {}, {
eachMessage: this.getMessageHandler(),
});
await consumer.run(consumerRunOptions);
}
override getHandlerByPattern(pattern: string) {
const route = this.getRouteFromPattern(pattern);
return this.messageHandlers.has(route)
? this.messageHandlers.get(route)
: this.testRegularExpressions(route) || null;
}
private testRegularExpressions(pattern: string) {
for (const [key, val] of this.messageHandlers.entries()) {
if (!key.startsWith('/') || !key.endsWith('/')) continue;
const regex = new RegExp(
key.slice(1, pattern.length - 1),
val.extras.flags,
);
if (regex.test(pattern)) {
return val;
}
}
}
}
I also attempted to resolve these issues but this is over my head atm for the amount of time I have to work with it. I took a peek at the source code and attempted to mimic chaining the funcitons via the next
property but it still doesn't work correctly. There is probably something else happening behind the scenes with the way next
is setup and used I didn't see or understand.
Here is the code for that if anyone wants to expand off the idea...
export class KafkaCustomTransport
extends ServerKafka
implements CustomTransportStrategy
{
override async bindEvents(consumer: Consumer): Promise<void> {
const registeredPatterns = [...this.messageHandlers.entries()].map(
([pattern, handler]) =>
pattern.startsWith('/') && pattern.endsWith('/')
? new RegExp(
pattern.slice(1, pattern.length - 1),
handler.extras.flags,
)
: pattern,
);
const consumerSubscribeOptions = this.options.subscribe || {};
const subscribeToPattern = async (pattern: string) =>
consumer.subscribe({
topic: pattern,
...consumerSubscribeOptions,
});
await Promise.all(registeredPatterns.map(subscribeToPattern));
const consumerRunOptions = Object.assign(this.options.run || {}, {
eachMessage: this.getMessageHandler(),
});
await consumer.run(consumerRunOptions);
}
override getHandlerByPattern(pattern: string) {
const route = this.getRouteFromPattern(pattern);
const handlers: MessageHandler[] = [];
if (this.messageHandlers.has(route))
handlers.push(this.messageHandlers.get(route));
for (const [key, val] of this.messageHandlers.entries()) {
if (!key.startsWith('/') || !key.endsWith('/')) continue;
const regex = new RegExp(key.slice(1, key.length - 1), val.extras.flags);
if (regex.test(pattern)) {
handlers.push(val);
}
}
const allHandlers: MessageHandler[][] = [];
for (let i = 0; i < handlers.length; i++) {
const handler = handlers[i];
const hierarchy: MessageHandler[] = [];
let nextChild = handler;
while (nextChild) {
hierarchy.push(this.cloneHandle(nextChild));
nextChild = nextChild.next;
}
allHandlers.push(hierarchy);
}
const flattened = allHandlers.flat();
for (let i = flattened.length - 1; i >= 0; i--) {
const handler = flattened[i];
const prev = flattened[i - 1];
if (prev) prev.next = handler;
}
return flattened.length > 0 ? flattened[0] : null;
}
private cloneHandle(handle: MessageHandler) {
const dup = handle.bind({}) as MessageHandler;
dup.isEventHandler = handle.isEventHandler;
dup.extras = { ...handle.extras };
return dup;
}
}
@jfelicianiats
This should work
import { CustomTransportStrategy, ServerKafka } from '@nestjs/microservices';
import { Consumer } from 'kafkajs';
export class KafkaCustomTransport
extends ServerKafka
implements CustomTransportStrategy
{
override async bindEvents(consumer: Consumer): Promise<void> {
const registeredPatterns = [...this.messageHandlers.entries()].map(
([pattern, handler]) =>
pattern.startsWith('/') && pattern.endsWith('/')
? new RegExp(
pattern.slice(1, pattern.length - 2),
handler.extras.flags,
)
: pattern,
);
const consumerSubscribeOptions = this.options.subscribe || {};
const subscribeToPattern = async (pattern: string) =>
consumer.subscribe({
topic: pattern,
...consumerSubscribeOptions,
});
await Promise.all(registeredPatterns.map(subscribeToPattern));
const consumerRunOptions = Object.assign(this.options.run || {}, {
eachMessage: this.getMessageHandler(),
});
await consumer.run(consumerRunOptions);
}
public override getHandlerByPattern(pattern: string) {
const handler = super.getHandlerByPattern(pattern);
if (handler) {
return handler;
}
return this.getHandlerByRegExp(pattern);
}
private getHandlerByRegExp(pattern: string) {
const route = this.getRouteFromPattern(pattern);
const keys = this.messageHandlers.keys();
for (const key of keys) {
const regexp = new RegExp(key);
if (regexp.test(route)) return this.messageHandlers.get(key);
}
return null;
}
}
Hi everybody,
Thanks for the discussion and posting workarounds.
My question is: do these workarounds also support topics that get created in future? If my app is deployed, and then a topic whose name match the regex gets created, will the app get subscribed to this new topic?
Thank you!
Cheers, Wei
Hi everybody,
Thanks for the discussion and posting workarounds.
My question is: do these workarounds also support topics that get created in future? If my app is deployed, and then a topic whose name match the regex gets created, will the app get subscribed to this new topic?
Thank you!
Cheers, Wei
No, subscribe on topic only happens when consumer starts
You can pass a list of matching topics at once
@EventPattern(['topic.one', 'topic.two'])
I think this will solve your problem
Thank you @fjodor-rybakov
We have a topic for each customer such as <<customerId>>.traffic
. It is impossible to list them all in @EventPattern
. Also when new customers are onboarded, new topics will be added.
We try to do with @EventPattern('/*.traffic/')
, but it does not work.
Or, do I misunderstand your suggestion?
Same issue as @ww917352, any updates on this?
Adding +1, this is greatly needed since any message that comes in with a malformed pattern will stop the app from consuming completely.
+1 for this
+1
+1
Before deciding on an official Api for this, please consider the same partial matching in Event/MessagePattern, when the pattern is an object.
Eg. The Nest microservice consumes messages from a queue and handler qualifies based on a sub-set of the message properties/attributes
Given message handler definition
@MessagePattern({ foo: 'foo' }, { partialMatch: true }) // ?
public handlerMethod() {}
it qualifies incoming message patterns that partially match
{ foo: 'foo', bar: 'bar', ... }
+1
export class KafkaCustomTransport extends ServerKafka implements CustomTransportStrategy { override async bindEvents(consumer: Consumer): Promise<void> { const registeredPatterns = [...this.messageHandlers.entries()].map( ([pattern, handler]) => pattern.startsWith('/') && pattern.endsWith('/') ? new RegExp( pattern.slice(1, pattern.length - 2), handler.extras.flags, ) : pattern, ); const consumerSubscribeOptions = this.options.subscribe || {}; const subscribeToPattern = async (pattern: string) => consumer.subscribe({ topic: pattern, ...consumerSubscribeOptions, }); await Promise.all(registeredPatterns.map(subscribeToPattern)); const consumerRunOptions = Object.assign(this.options.run || {}, { eachMessage: this.getMessageHandler(), }); await consumer.run(consumerRunOptions); } public override getHandlerByPattern(pattern: string) { const handler = super.getHandlerByPattern(pattern); if (handler) { return handler; } return this.getHandlerByRegExp(pattern); } private getHandlerByRegExp(pattern: string) { const route = this.getRouteFromPattern(pattern); const keys = this.messageHandlers.keys(); for (const key of keys) { const regexp = new RegExp(key); if (regexp.test(route)) return this.messageHandlers.get(key); } return null; } }
Some fix
const regexp = new RegExp(key.slice(1, key.length - 1));
Any update?
+1
Also would like somthing like this (more specifically with rabbitMq) - I was slightly misled by the docs here https://docs.nestjs.com/microservices/basics#decorators which implies to me that
@MessagePattern('time.us.*')
getDate(@Payload() data: number[], @Ctx() context: NatsContext) {
console.log(`Subject: ${context.getSubject()}`); // e.g. "time.us.east"
return new Date().toLocaleTimeString(...);
}
would match "time.us.west", "time.us.east" etc.
@kamilmysliwiec Hello! Do you know if this was fixed? Or there wasn't a solution found? We could really use this feature since we have different envs, hence different topics to consume from.
NestJS feels like a collection of leaky abstractions with missing features and edge cases creeping there to surprise you every now and then and the official docs don't go into the details deeply enough to prepare for those things so you discover them only as you go :(
Updates?
any updates here?
Bug Report
.
Current behavior
It is not possible to use a RegEx for event and message patterns. This is possible when using KafkaJS library but is not possible in NestJS
Input Code
Having a controller emitting a message
'pre-notify-post'
Also having a controller with the event handler and expecting messages that match the regular expression of
/.*notify.*/
:Currently, the event handler never gets called, because the handler is not matched using regular expressions.
Expected behavior
It is expected to listen to events and messages by using RegEx patterns. This is possible in KafkaJS library and it is useful when having a set of a topic that the consumer wants to listen to.
https://kafka.js.org/docs/consuming
Possible Solution
This could be fixed by getting the matching routes.
Environment
Nest version: 6.7