seppevs / migrate-mongo

A database migration tool for MongoDB in Node
MIT License
931 stars 166 forks source link

Mongodb transaction giving error when documents have more then 1k to migrate #450

Open anishbishnoi127 opened 2 months ago

anishbishnoi127 commented 2 months ago

Describe the bug when i have more then 1k documents that time transaction crashing. To Reproduce Steps to reproduce the behavior: makes documents 10k and then try to migrate using transaction.

Expected behavior A clear and concise description of what you expected to happen. it's should migrate

Additional context giving this error ERROR: Could not migrate up 20240915073415-trail-schema.js: Transaction with { txnNumber: 2 } has been aborted. MongoBulkWriteError: Transaction with { txnNumber: 2 } has been aborted. at resultHandler (C:\Users\AnishKumar\Videos\code\SH3\BE\NEWSH\node_modules.pnpm\mongodb@6.9.0\node_modules\mongodb\lib\bulk\common.js:294:29) at C:\Users\AnishKumar\Videos\code\SH3\BE\NEWSH\node_modules.pnpm\mongodb@6.9.0\node_modules\mongodb\lib\bulk\common.js:344:159 at process.processTicksAndRejections (node:internal/process/task_queues:95:5)

my code is

async up(db, client) {
        const startTime = performance.now(); // Start time of the migration
        let retries = 0;
        while (retries < MAX_RETRIES) {
            const session = client.startSession({
                causalConsistency: true,
                defaultTransactionOptions: {
                    readConcern: { level: 'majority' },
                    writeConcern: { w: 'majority', wtimeout: 2147483646 },
                    readPreference: 'primary',
                    maxCommitTimeMS: 2147483646
                },
                snapshot: false,
                transactionLifetimeLimitSeconds: 2147483646
            });
            try {
                await session.withTransaction(async () => {
                    const BATCH_SIZE = batch_size;
                    let skip = 0;
                    let batch;

                    const missingFieldsFilter = {
                        $or: Object.keys(fields_which_need_to_migrate).map((field) => ({
                            [field]: { $exists: false }
                        }))
                    };

                    // Check if the MigrationHistory and MigrationChanges collections exist
                    const collections = await db.listCollections().toArray();
                    const existingCollections = collections.map((col) => col.name);

                    // Create collections only if they don't exist
                    if (!existingCollections.includes(migration_history_collection_name)) {
                        await db.createCollection(migration_history_collection_name, { session });
                        await db
                            .collection(migration_history_collection_name)
                            .createIndex({ version: 1 }, { unique: true, session });
                    }

                    if (!existingCollections.includes(migration_changes_collection_name)) {
                        await db.createCollection(migration_changes_collection_name, { session });
                        await db
                            .collection(migration_changes_collection_name)
                            .createIndex({ migrationVersion: 1 }, { session });
                    }

                    // Ensure migration version is unique
                    const existingMigration = await db
                        .collection(migration_history_collection_name)
                        .findOne({ version: migrationVersion }, { session });
                    if (existingMigration) {
                        throw new Error(`Migration version ${migrationVersion} already exists.`);
                    }

                    const migrationDoc = {
                        version: migrationVersion,
                        appliedAt: new Date(),
                        status: 'in_progress'
                    };

                    const migrationId = (
                        await db.collection(migration_history_collection_name).insertOne(migrationDoc, { session })
                    ).insertedId;

                    do {
                        batch = await db
                            .collection(collection_name_which_need_to_migrate)
                            .find(missingFieldsFilter)
                            .skip(skip)
                            .limit(BATCH_SIZE)
                            .toArray();

                        if (batch.length > 0) {
                            const bulkOperations = batch.map((doc) => {
                                const updateFields = {};
                                const originalValues = {};
                                const updatedFields = {};

                                for (const [field, defaultValue] of Object.entries(fields_which_need_to_migrate)) {
                                    if (!(field in doc)) {
                                        updateFields[field] = defaultValue;
                                        updatedFields[field] = defaultValue;
                                    } else {
                                        originalValues[field] = doc[field];
                                    }
                                }

                                if (Object.keys(updateFields).length > 0) {
                                    // Save each change as a separate document in MigrationChanges
                                    const changeDoc = {
                                        migrationVersion: migrationVersion,
                                        migrationId: migrationId,
                                        collection: collection_name_which_need_to_migrate,
                                        documentId: doc._id,
                                        updatedAt: new Date(),
                                        fieldsUpdated: updatedFields,
                                        originalValues
                                    };

                                    return [
                                        {
                                            insertOne: {
                                                document: changeDoc
                                            }
                                        },
                                        {
                                            updateOne: {
                                                filter: { _id: doc._id },
                                                update: { $set: updateFields }
                                            }
                                        }
                                    ];
                                }
                                return [];
                            });

                            const operations = bulkOperations.flat();

                            if (operations.length > 0) {
                                await Promise.all([
                                    db.collection(migration_changes_collection_name).bulkWrite(
                                        operations.filter((op) => op.insertOne),
                                        { session }
                                    ),
                                    db.collection(collection_name_which_need_to_migrate).bulkWrite(
                                        operations.filter((op) => op.updateOne),
                                        { session }
                                    )
                                ]);
                            }

                            skip += BATCH_SIZE;
                        }
                    } while (batch.length === BATCH_SIZE);

                    console.log("migration done !")
                    // Update the main migration document status to success
                    await db
                        .collection(migration_history_collection_name)
                        .updateOne(
                            { _id: migrationId },
                            { $set: { status: 'success', completedAt: new Date() } },
                            { session }
                        );

                    console.log(`Migration ${migrationVersion} applied successfully.`);
                });
                break; // Exit the loop if successful
            } catch (error) {
                console.error(`Migration ${migrationVersion} failed:`, error);
                retries += 1;
                if (retries >= MAX_RETRIES) {
                    await db
                        .collection(migration_history_collection_name)
                        .updateOne(
                            { version: migrationVersion },
                            { $set: { status: 'failure', reason: error.message, completedAt: new Date() } }
                        );
                    throw error;
                }
            } finally {
                const endTime = performance.now(); // End time of the migration
                const duration = (endTime - startTime) / 1000; // Duration in seconds
                await session.endSession();
            }
        }
    },