Closed khaliqgant closed 3 months ago
Reproduced locally, you just trying to lock a row that doesn't exists, forUpdate
will only lock if there is something to lock and there is no unique key constraint so it won't fail to insert two rows. With an advisory lock it seems better
diff --git a/packages/server/lib/hooks/hooks.integration.test.ts b/packages/server/lib/hooks/hooks.integration.test.ts
new file mode 100644
index 00000000..7404f3a2
--- /dev/null
+++ b/packages/server/lib/hooks/hooks.integration.test.ts
@@ -0,0 +1,139 @@
+import { describe, it, beforeAll } from 'vitest';
+import { multipleMigrations } from '@nangohq/database';
+import { connectionRefreshFailed } from './hooks.js';
+import { configService, connectionService, seeders } from '@nangohq/shared';
+import { logContextGetter } from '@nangohq/logs';
+
+describe('dff', () => {
+ beforeAll(async () => {
+ await multipleMigrations();
+ });
+
+ it('df', async () => {
+ const { env } = await seeders.seedAccountEnvAndUser();
+ await seeders.createConfigSeed(env, 'unauthenticated', 'unauthenticated');
+ const logCtx = await logContextGetter.create({ operation: { type: 'auth', action: 'refresh_token' }, message: 'Token refresh error' }, {} as any, {
+ dryRun: true
+ });
+ const tmp = await seeders.createConnectionSeed(env, 'unauthenticated');
+
+ const { response: connection } = await connectionService.getConnection(tmp.connection_id, 'unauthenticated', env.id);
+
+ const config = await configService.getProviderConfig(connection!.provider_config_key, env.id);
+ const template = configService.getTemplate(config!.provider);
+ await Promise.all([
+ connectionRefreshFailed({
+ authError: { description: 't', type: 'auth' },
+ template,
+ config: config!,
+ logCtx,
+ connection: connection!,
+ environment: env
+ }),
+ connectionRefreshFailed({
+ authError: { description: 't', type: 'auth' },
+ template,
+ config: config!,
+ logCtx,
+ connection: connection!,
+ environment: env
+ }),
+ connectionRefreshFailed({
+ authError: { description: 't', type: 'auth' },
+ template,
+ config: config!,
+ logCtx,
+ connection: connection!,
+ environment: env
+ }),
+ connectionRefreshFailed({
+ authError: { description: 't', type: 'auth' },
+ template,
+ config: config!,
+ logCtx,
+ connection: connection!,
+ environment: env
+ }),
+ connectionRefreshFailed({
+ authError: { description: 't', type: 'auth' },
+ template,
+ config: config!,
+ logCtx,
+ connection: connection!,
+ environment: env
+ }),
+ connectionRefreshFailed({
+ authError: { description: 't', type: 'auth' },
+ template,
+ config: config!,
+ logCtx,
+ connection: connection!,
+ environment: env
+ }),
+ connectionRefreshFailed({
+ authError: { description: 't', type: 'auth' },
+ template,
+ config: config!,
+ logCtx,
+ connection: connection!,
+ environment: env
+ }),
+ connectionRefreshFailed({
+ authError: { description: 't', type: 'auth' },
+ template,
+ config: config!,
+ logCtx,
+ connection: connection!,
+ environment: env
+ }),
+ connectionRefreshFailed({
+ authError: { description: 't', type: 'auth' },
+ template,
+ config: config!,
+ logCtx,
+ connection: connection!,
+ environment: env
+ }),
+ connectionRefreshFailed({
+ authError: { description: 't', type: 'auth' },
+ template,
+ config: config!,
+ logCtx,
+ connection: connection!,
+ environment: env
+ }),
+ connectionRefreshFailed({
+ authError: { description: 't', type: 'auth' },
+ template,
+ config: config!,
+ logCtx,
+ connection: connection!,
+ environment: env
+ }),
+ connectionRefreshFailed({
+ authError: { description: 't', type: 'auth' },
+ template,
+ config: config!,
+ logCtx,
+ connection: connection!,
+ environment: env
+ }),
+ connectionRefreshFailed({
+ authError: { description: 't', type: 'auth' },
+ template,
+ config: config!,
+ logCtx,
+ connection: connection!,
+ environment: env
+ }),
+ connectionRefreshFailed({
+ authError: { description: 't', type: 'auth' },
+ template,
+ config: config!,
+ logCtx,
+ connection: connection!,
+ environment: env
+ })
+ ]);
+ });
+});
diff --git a/packages/server/lib/hooks/hooks.ts b/packages/server/lib/hooks/hooks.ts
index accc1679..3cc4ab89 100644
--- a/packages/server/lib/hooks/hooks.ts
+++ b/packages/server/lib/hooks/hooks.ts
@@ -37,8 +37,6 @@ import { sendAuth as sendAuthWebhook } from '@nangohq/webhooks';
const logger = getLogger('hooks');
const orchestrator = getOrchestrator();
-const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
-
export const connectionCreationStartCapCheck = async ({
providerConfigKey,
environmentId,
@@ -187,8 +185,6 @@ export const connectionRefreshFailed = async ({
const slackNotificationService = new SlackService({ orchestratorClient: getOrchestratorClient(), logContextGetter });
- const randomDelay = Math.random() * 1000;
- await delay(randomDelay);
await slackNotificationService.reportFailure(connection, connection.connection_id, 'auth', logCtx.id, environment.id, config.provider);
};
diff --git a/packages/shared/lib/services/notification/slack.service.ts b/packages/shared/lib/services/notification/slack.service.ts
index e31aa2ce..c2b28eac 100644
--- a/packages/shared/lib/services/notification/slack.service.ts
+++ b/packages/shared/lib/services/notification/slack.service.ts
@@ -9,6 +9,7 @@ import accountService from '../account.service.js';
import type { LogContext, LogContextGetter } from '@nangohq/logs';
import type { OrchestratorClientInterface } from '../../clients/orchestrator.js';
import { Orchestrator } from '../../clients/orchestrator.js';
+import { NangoError } from '../../utils/error.js';
const logger = getLogger('SlackService');
const TABLE = dbNamespace + 'slack_notifications';
@@ -92,7 +93,11 @@ export class SlackService {
* notification to the Nango admin account
*/
private async getNangoAdminConnection(): Promise<NangoConnection | null> {
- const info = await accountService.getAccountAndEnvironmentIdByUUID(this.nangoAdminUUID as string, this.env);
+ if (!this.nangoAdminUUID) {
+ return null;
+ }
+
+ const info = await accountService.getAccountAndEnvironmentIdByUUID(this.nangoAdminUUID, this.env);
const { success, response: slackConnection } = await connectionService.getConnection(
this.adminConnectionId,
@@ -107,8 +112,12 @@ export class SlackService {
return slackConnection;
}
- private async getAdminEnvironmentId(): Promise<number> {
- const info = await accountService.getAccountAndEnvironmentIdByUUID(this.nangoAdminUUID as string, this.env);
+ private async getAdminEnvironmentId(): Promise<number | null> {
+ if (!this.nangoAdminUUID) {
+ return null;
+ }
+
+ const info = await accountService.getAccountAndEnvironmentIdByUUID(this.nangoAdminUUID, this.env);
return info?.environmentId as number;
}
@@ -216,6 +225,10 @@ export class SlackService {
if (success && !slackNotificationStatus) {
return;
}
+ if (!success) {
+ logger.error('failed to create row');
+ return;
+ }
const account = await environmentService.getAccountFromEnvironment(environment_id);
if (!account) {
@@ -224,6 +237,10 @@ export class SlackService {
const slackConnectionId = generateSlackConnectionId(account.uuid, envName);
const nangoEnvironmentId = await this.getAdminEnvironmentId();
+ if (!nangoEnvironmentId) {
+ logger.error('No admin environment id');
+ return;
+ }
// we get the connection on the nango admin account to be able to send the notification
const {
@@ -379,6 +396,11 @@ export class SlackService {
}
const nangoEnvironmentId = await this.getAdminEnvironmentId();
+ if (!nangoEnvironmentId) {
+ logger.error('No admin environment id');
+ return;
+ }
+
const slackConnectionId = generateSlackConnectionId(account.uuid, envName);
const { success: connectionSuccess, response: slackConnection } = await connectionService.getConnection(
slackConnectionId,
@@ -489,6 +511,13 @@ export class SlackService {
*/
async addFailingConnection(nangoConnection: NangoConnection, name: string, type: string): Promise<ServiceResponse<NotificationResponse>> {
return await db.knex.transaction(async (trx) => {
+ // we need to acquire a Lock that prevents any other duplicate hook to create the same row
+ // TODO: create correct unique ID
+ const { rows } = await trx.raw<{ rows: { pg_try_advisory_xact_lock: boolean }[] }>(`SELECT pg_try_advisory_xact_lock(?);`, [34]);
+ if (!rows || rows.length <= 0 || !rows[0]!.pg_try_advisory_xact_lock) {
+ return { success: false, error: new NangoError('adv_lock'), response: null };
+ }
+
const isOpen = await this.hasOpenNotification(nangoConnection, name, type, trx);
if (isOpen && type === 'auth') {
Thanks both. I’ll update with a lock which agreed should be a more resilient fix
Describe your changes
This annoying issue is hard to reproduce without removing the locking delay on local for the
tryAcquire
methodWith that multiple slack notifications can be seen which in my testing was mitigated by:
forUpdate
to the database lookups and insertsawait
the slack notification callTesting
With this I was seeing multiple slack notifications but with the update logic now only one is received consistently.
Issue ticket number and link
NAN-1163
Checklist before requesting a review (skip if just adding/editing APIs & templates)