Pub/Sub is an asynchronous messaging service that decouples services that produce events from services that process events.
You can use Pub/Sub as messaging-oriented middleware or event ingestion and delivery for streaming analytics pipelines.
Pub/Sub offers durable message storage and real-time message delivery with high availability and consistent performance at scale
To start building Pub/Sub-based microservices, first install the required packages:
$ npm i --save @google-cloud/pubsub nestjs-google-pubsub-microservice
To use the Pub/Sub transporter, pass the following options object to the createMicroservice()
method:
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
ApplicationModule,
{
strategy: new GCPubSubServer({
topic: 'cats_topic',
subscription: 'cats_subscription',
client: {
projectId: 'microservice',
},
}),
},
);
The options
property is specific to the chosen transporter. The GCloud Pub/Sub transporter exposes the properties described below.
topic |
Topic name which your server subscription will belong to |
subscription |
Subscription name which your server will listen to |
replyTopic |
Topic name which your client subscription will belong to |
replySubscription |
Subscription name which your client will listen to |
noAck |
If false , manual acknowledgment mode enabled |
init |
If false , topics and subscriptions will not be created, only validated |
checkExistence |
If false , topics and subscriptions will not be checked, only used. This only applies when init is false |
useAttributes |
Only applicable for client. If true , pattern and correlationId will be sent via message attributes. This is useful if message consumer is not NestJs microservice or you have message filtering on subscription |
client |
Additional client options (read more here) |
publisher |
Additional topic publisher options (read more here) |
subscriber |
Additional subscriber options (read more here) |
scopedEnvKey |
Scope topics and subscriptions to avoid losing messages when several people are working on the same code base. Will prefixes topics and subscriptions with this key (read more here) |
const client = new GCPubSubClient({
client: {
apiEndpoint: 'localhost:8681',
projectId: 'microservice',
},
});
client
.send('pattern', 'Hello world!')
.subscribe((response) => console.log(response));
In more sophisticated scenarios, you may want to access more information about the incoming request. When using the Pub/Sub transporter, you can access the GCPubSubContext
object.
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: GCPubSubContext) {
console.log(`Pattern: ${context.getPattern()}`);
}
To access the original Pub/Sub message (with the attributes
, data
, ack
and nack
), use the getMessage()
method of the GCPubSubContext
object, as follows:
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: GCPubSubContext) {
console.log(context.getMessage());
}
To make sure a message is never lost, Pub/Sub supports message acknowledgements. An acknowledgement is sent back by the consumer to tell Pub/Sub that a particular message has been received, processed and that Pub/Sub is free to delete it. If a consumer dies (its subscription is closed, connection is closed, or TCP connection is lost) without sending an ack, Pub/Sub will understand that a message wasn't processed fully and will re-deliver it.
To enable manual acknowledgment mode, set the noAck
property to false
:
{
replyTopic: 'cats_topic_reply',
replySubscription: 'cats_subscription_reply',
noAck: false,
client: {
projectId: 'microservice',
},
},
When manual consumer acknowledgements are turned on, we must send a proper acknowledgement from the worker to signal that we are done with a task.
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: GCPubSubContext) {
const originalMsg = context.getMessage();
originalMsg.ack();
}
Pub/Sub requires a graceful shutdown properly configured in order to work correctly, otherwise some messages acknowledges can be lost. Therefore, don't forget to call client close:
export class GCPubSubController implements OnApplicationShutdown {
client: ClientProxy;
constructor() {
this.client = new GCPubSubClient({});
}
onApplicationShutdown() {
return this.client.close();
}
}