Open Borduhh opened 2 years ago
Same issue connecting to Confluent cloud hosted cluster from producer. This is intermittent issue but doesn't happen all times
kafkajs: 2.1.0 @kafkajs/confluent-schema-registry: 3.3.0
const sasl: SASLOptions = {
username: secrets.kafkaClusterApiKey,
password: secrets.kafkaClusterApiSecret,
mechanism: "plain",
};
const kafkaConfig: KafkaConfig = {
clientId: clientId,
brokers: [kafkaClusterEndpoint],
ssl: true,
sasl,
};
const kafka = new Kafka(kafkaConfig);
const producer = kafka.producer({ createPartitioner: Partitioners.DefaultPartitioner });
logger.info("Kafka producer created");
await producer.connect();
{
"level": "ERROR",
"timestamp": "2023-03-27T02:34:39.463Z",
"logger": "kafkajs",
"message": "[Connection] Connection error: Client network socket disconnected before secure TLS connection was established",
"broker": "****:9092",
"clientId": "agents-call-data-collection",
"stack": "Error: Client network socket disconnected before secure TLS connection was established\n at connResetException (internal/errors.js:639:14)\n at TLSSocket.onConnectEnd (_tls_wrap.js:1570:19)\n at TLSSocket.emit (events.js:412:35)\n at endReadableNT (internal/streams/readable.js:1333:12)\n at processTicksAndRejections (internal/process/task_queues.js:82:21)"
}
2023-03-27T02:34:39.463Z bb588c6d-58e6-44ba-abf5-aef5d106a4dc ERROR
{
"level": "ERROR",
"timestamp": "2023-03-27T02:34:39.463Z",
"logger": "kafkajs",
"message": "[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection error: Client network socket disconnected before secure TLS connection was established",
"retryCount": 0,
"retryTime": 334
}```
What is the right way to use kafkajs with lambda as I assume you would be making a lot more connections than you normally would with a microservice which can easily manage a long-standing connection and reuse rather than lambda invocations that get stood up and torn down ad hoc. With AWS MSK with IAM, it only supports 20 connections per second for large brokers which if you are chatting across a bunch of topics can easily be maxxed.
We just removed the IAM authentication and put everything in a private VPC. That blocks the outside world from accessing the cluster and allows our Lambdas to access the internet via a NAT gateway. We have been using this setup in our production environment now for almost a year as a backbone event source, and it is working beautifully.
The reasoning was that IAM authentication was causing CPU issues with the constant connects/disconnects in the MSK cluster.
Thank you @Borduhh this may be the solution for us as well
We just removed the IAM authentication and put everything in a private VPC. That blocks the outside world from accessing the cluster and allows our Lambdas to access the internet via a NAT gateway. We have been using this setup in our production environment now for almost a year as a backbone event source, and it is working beautifully.
The reasoning was that IAM authentication was causing CPU issues with the constant connects/disconnects in the MSK cluster.
Hi @Borduhh ,
We are facing the same issue. Could you show me how did you remove the IAM authentication on MSK?
Thanks
We just removed the IAM authentication and put everything in a private VPC. That blocks the outside world from accessing the cluster and allows our Lambdas to access the internet via a NAT gateway. We have been using this setup in our production environment now for almost a year as a backbone event source, and it is working beautifully. The reasoning was that IAM authentication was causing CPU issues with the constant connects/disconnects in the MSK cluster.
Hi @Borduhh ,
We are facing the same issue. Could you show me how did you remove the IAM authentication on MSK?
Thanks
When you create the instance, do not select AWS IAM as an Authentication Type. I believe it has to be in a private Subnet to do so.
Hi @Borduhh what authentication type do you use if IAM is not used? Also if there is no bootstrap servers, what can we use for the list of brokers?
My current setup is in IAM and already in VPC.
Thank you!
You can use mutual TLS or SASL/SCRAM and secrets manager for authentication as noted in their documentation here: https://docs.aws.amazon.com/msk/latest/developerguide/kafka_apis_iam.html. You can use Kafka ACL's for authorization if you need to as well. It looks like both mutual TLS and SASL/SCRAM with secrets manager are supported by the Lambda triggers as well if that is your setup.
Hi,
I resolved this issue by implementing the singleton pattern for the producer and gracefully disconnecting the producer after the Lambda was shut down.
Source code:
import { Type, awsIamAuthenticator } from "@jm18457/kafkajs-msk-iam-authentication-mechanism";
import { Kafka, CompressionTypes } from "kafkajs";
import { v4 as uuid } from "uuid";
import type { Admin, Message, Producer } from "kafkajs";
let kafka: Kafka | undefined;
let kafkaProducer: Producer | undefined;
let kafkaAdmin: Admin | undefined;
// SIGTERM for gracefully shutdown
process.on("SIGTERM", async () => {
console.info("[runtime] SIGTERM received");
console.info("[runtime] cleaning up producer..", kafkaProducer);
// perform actual clean up work here.
await kafkaProducer?.disconnect();
console.info("[runtime] exiting");
process.exit(0);
});
const getKafkaClient = () => {
if (!kafka) {
kafka = new Kafka({
clientId: uuid(),
ssl: true,
brokers: process.env.MSK_CLUSTER_BROKERS!.split(","),
retry: {
retries: 10,
initialRetryTime: 100,
factor: 0.2,
multiplier: 3,
maxRetryTime: 300000
},
sasl: {
mechanism: Type,
authenticationProvider: awsIamAuthenticator({ region: process.env.AWS_REGION! })
}
});
}
return kafka;
};
export const getKafkaAdmin = () => {
if (!kafkaAdmin) {
kafkaAdmin = getKafkaClient().admin();
}
return kafkaAdmin;
};
export interface KafkaPayload<T = any> {
data: T;
uniqueKey: any;
}
export const sendKafkaMessages = async ({
data,
topic,
partition,
isRetry
}: {
data: any[];
topic: string;
// set the key if the events need to be run in sequence
partition?: number;
isRetry?: boolean;
}) => {
if (!kafkaProducer) {
kafkaProducer = getKafkaClient().producer({
idempotent: true
});
await kafkaProducer.connect();
}
await kafkaProducer.send({
messages: data.map((data) => {
const kafkaPayload: KafkaPayload = {
data,
uniqueKey: uuid()
};
const message: Message = {
value: JSON.stringify(kafkaPayload),
partition,
headers: isRetry ? { ["isRetry"]: "" } : undefined
};
return message;
}),
topic: topic,
compression: CompressionTypes.GZIP
});
};
Tips:
1) Create a single producer with active connection, store it somewhere and reuse it instead of connecting for each call. 2) Use SIGTERM for closing the half-opened connection when Lambda was shut down.
After this change, the active connection on broker dropped from 3000 (AWS hard limit) to 100, and the error was gone.
Reference: https://github.com/tulios/kafkajs/issues/1423#issuecomment-1193800427
Yes, but if you are using a straight lambda architecture, you cannot keep the singleton alive all the time like this. We actually have something very similar but since there are many lambda invocations it still means a lot of created connections which unfortunately with IAM I guess is too costly an operation. We have not tried it yet, but hoping that AWS MSK Serverless with IAM might help elastically scale as we need connections.
Hi @Borduhh what authentication type do you use if IAM is not used? Also if there is no bootstrap servers, what can we use for the list of brokers?
My current setup is in IAM and already in VPC.
Thank you!
We don't have any authentication. Since the cluster is in the private subnet of our VPC, only our internal resources can access it.
Yes, but if you are using a straight lambda architecture, you cannot keep the singleton alive all the time like this. We actually have something very similar but since there are many lambda invocations it still means a lot of created connections which unfortunately with IAM I guess is too costly an operation. We have not tried it yet, but hoping that AWS MSK Serverless with IAM might help elastically scale as we need connections.
As I tried, it can keep alive. Because once it's connected, it will automatically reconnect by kafkajs once the current connection was closed, so you don't need to do anything.
I don't know how many Lambdas do you have running in parallel. There are multiple brokers for MSK, each broker can handle 3000 connections, and you can add the broker number to whatever numbers you want. I believe there won't be any bottlenecks even if you have 10000 Lambda running concurrently for MSK if you increased the brokers to proper size.
Would suggest to try my code and see if it works. Cheers buddy.
Describe the bug We see inconsistent connection/authentication timeouts when using AWS Lambda Producers via KafkaJS and MSK.
To Reproduce
NOTE: We can consistently produce the error in our technical stack but cannot produce the error using a minimum repo
Example Lambda Code:
Example Kafka Middleware:
Expected behavior The producer should connect to the MSK cluster without an issue.
Observed behavior Lambda Logs:
MSK Broker Logs:
Environment:
Additional context The weird part is that we load tested a simple repo with the same code with 30+ lambdas and 4K messages, and it worked as expected, although we manually invoked the lambda.