cool-team-official / cool-admin-midway-rpc

cool admin for node 微服务
3 stars 2 forks source link

RpcTransaction Bug #3

Open hanxiaolong-github opened 1 year ago

hanxiaolong-github commented 1 year ago

在同一个 rpc transaction 中,call 另一个服务方法,只能调用10次。 例如: /* admin 服务中的 orderService 的createOrders 调用 oms 服务中的orderService createOrders。 / @CoolRpcTransaction() async createOrders(params, rpcTransactionId?, queryRunner?: QueryRunner){ await this.rpc.call('oms', 'orderService', 'createOrders', { rpcTransactionId, pageResult, user:this.ctx.user }); console.log('1') await this.rpc.call('oms', 'orderService', 'createOrders', { rpcTransactionId, pageResult, user:this.ctx.user }); console.log('2') await this.rpc.call('oms', 'orderService', 'createOrders', { rpcTransactionId, pageResult, user:this.ctx.user }); console.log('3') await this.rpc.call('oms', 'orderService', 'createOrders', { rpcTransactionId, pageResult, user:this.ctx.user }); console.log('4') await this.rpc.call('oms', 'orderService', 'createOrders', { rpcTransactionId, pageResult, user:this.ctx.user }); console.log('5') await this.rpc.call('oms', 'orderService', 'createOrders', { rpcTransactionId, pageResult, user:this.ctx.user }); console.log('6') await this.rpc.call('oms', 'orderService', 'createOrders', { rpcTransactionId, pageResult, user:this.ctx.user }); console.log('7') await this.rpc.call('oms', 'orderService', 'createOrders', { rpcTransactionId, pageResult, user:this.ctx.user }); console.log('8') await this.rpc.call('oms', 'orderService', 'createOrders', { rpcTransactionId, pageResult, user:this.ctx.user }); console.log('9') await this.rpc.call('oms', 'orderService', 'createOrders', { rpcTransactionId, pageResult, user:this.ctx.user }); console.log('10') await this.rpc.call('oms', 'orderService', 'createOrders', { rpcTransactionId, pageResult, user:this.ctx.user }); console.log('11') await this.rpc.call('oms', 'orderService', 'createOrders', { rpcTransactionId, pageResult, user:this.ctx.user }); return result; }

控制台中 '11' 这一步执行不到。oms 服务里面就抱错:oms-76427320-4731-11ee-94c3-0f36b4a3fb0f/DISCOVERY: Heartbeat is not received from 'admin-7f7815d0-4731-11ee-8235-e9124e81722b' node.

hanxiaolong-github commented 1 year ago

对了 补充一点 oms orderService createOrders 只是一个空方法像这样: @CoolRpcTransaction() async createOrders(params, rpcTransactionId?, queryRunner?: QueryRunner){

}

hanxiaolong-github commented 1 year ago

问题是找到了。应该是 transaction.js "use strict"; Object.defineProperty(exports, "__esModule", { value: true }); exports.CoolRpcTransaction = void 0; const typeorm_1 = require("typeorm"); const uuid_1 = require("uuid"); const core_1 = require("@cool-midway/core"); function CoolRpcTransaction(option) { return (target, propertyKey, descriptor) => { const method = descriptor.value; descriptor.value = async function (...args) { let isCaller = false; let rpcTransactionId; if (args[0]) { isCaller = false; rpcTransactionId = args[0].rpcTransactionId; } // 如果没有事务ID,手动创建 if (!rpcTransactionId) { isCaller = true; rpcTransactionId = uuid_1.v1(); } let data; const connection = typeorm_1.getConnection((option === null || option === void 0 ? void 0 : option.connectionName) || 'default'); const queryRunner = connection.createQueryRunner(); // 使用我们的新queryRunner建立真正的数据库连 await queryRunner.connect(); if (option && option.isolation) { await queryRunner.startTransaction(option.isolation); } else { await queryRunner.startTransaction(); } try { global['moleculer.transactions'][rpcTransactionId] = queryRunner; // 半小时后清除 setTimeout(() => { global['moleculer.transactions'][rpcTransactionId].release(); delete global['moleculer.transactions'][rpcTransactionId]; }, 1800 1000); data = await method.apply(this, [ ...args, rpcTransactionId, queryRunner, ]); if (isCaller) { global['moleculer:broker'].broadcast('moleculer.transaction', { rpcTransactionId, commit: true, }); } //await queryRunner.commitTransaction(); } catch (error) { //await queryRunner.rollbackTransaction(); if (isCaller) { global['moleculer:broker'].broadcast('moleculer.transaction', { rpcTransactionId, commit: false, }); } throw new core_1.CoolCommException(error.message); } return data; }; return descriptor; }; } exports.CoolRpcTransaction = CoolRpcTransaction; //# sourceMappingURL=transaction.js.map await queryRunner.connect(); 这一行。 我看代码意思是:没次调用一下CoolRpcTransaction 就要获取一个新的数据库连接。这样肯定会导致数据库连接用完。 我尝试改成这样,不知道对不对。 function CoolRpcTransaction(option) { return (target, propertyKey, descriptor) => { const method = descriptor.value; descriptor.value = async function (...args) { let isCaller = false; let rpcTransactionId; if (args[0]) { isCaller = false; rpcTransactionId = args[0].rpcTransactionId; } // 如果没有事务ID,手动创建 if (!rpcTransactionId) { isCaller = true; rpcTransactionId = uuid_1.v1(); } let data; let queryRunner; if(global['moleculer.transactions'][rpcTransactionId]){ queryRunner = global['moleculer.transactions'][rpcTransactionId]; }else{ const connection = typeorm_1.getConnection((option === null || option === void 0 ? void 0 : option.connectionName) || 'default'); queryRunner = connection.createQueryRunner(); // 使用我们的新queryRunner建立真正的数据库连 await queryRunner.connect(); if (option && option.isolation) { await queryRunner.startTransaction(option.isolation); } else { await queryRunner.startTransaction(); } } try { global['moleculer.transactions'][rpcTransactionId] = queryRunner; // 半小时后清除 setTimeout(() => { if(global['moleculer.transactions'][rpcTransactionId]){ global['moleculer.transactions'][rpcTransactionId].release(); delete global['moleculer.transactions'][rpcTransactionId]; } }, 1800 1000); data = await method.apply(this, [ ...args, rpcTransactionId, queryRunner, ]); if (isCaller) { global['moleculer:broker'].broadcast('moleculer.transaction', { rpcTransactionId, commit: true, }); } //await queryRunner.commitTransaction(); } catch (error) { //await queryRunner.rollbackTransaction(); if (isCaller) { global['moleculer:broker'].broadcast('moleculer.transaction', { rpcTransactionId, commit: false, }); } throw new core_1.CoolCommException(error.message); } return data; }; return descriptor; }; }

hanxiaolong-github commented 1 year ago

return (target, propertyKey, descriptor) => { const method = descriptor.value; descriptor.value = async function (...args) { let rpcTransactionId; let isCaller;

        // 获取传送过来的 rpcTransactionId
        // 尝试从第一个参数中获取。
        if (args[0]) {
          rpcTransactionId = args[0].rpcTransactionId;
        }
        // 尝试从第二个参数中获取
        if(args[1] && !rpcTransactionId){
          rpcTransactionId = args[1];
        }

        if(rpcTransactionId){
          isCaller = false;
        }
        else{// 如果没有事务ID,手动创建 
          isCaller = true;
          rpcTransactionId = uuid_1.v1();
        }

        let data;
        let queryRunner;
        if(global['moleculer.transactions'][rpcTransactionId]){
            queryRunner = global['moleculer.transactions'][rpcTransactionId];
        }else{
            const connection = typeorm_1.getConnection((option === null || option === void 0 ? void 0 : option.connectionName) || 'default');
            queryRunner = connection.createQueryRunner();
            // 使用我们的新queryRunner建立真正的数据库连
            console.log('start connect....')
            await queryRunner.connect();
            console.log('end connect....')
            if (option && option.isolation) {
                await queryRunner.startTransaction(option.isolation);
            }
            else {
                await queryRunner.startTransaction();
            }
        }
        try {
            global['moleculer.transactions'][rpcTransactionId] = queryRunner;
            // 半小时后清除
            setTimeout(() => {
                if(global['moleculer.transactions'][rpcTransactionId]){
                    global['moleculer.transactions'][rpcTransactionId].release();
                    delete global['moleculer.transactions'][rpcTransactionId];
                }
            }, 1800 * 1000);
            data = await method.apply(this, [
                ...args,
                rpcTransactionId,
                queryRunner,
            ]);
            if (isCaller) {
                global['moleculer:broker'].broadcast('moleculer.transaction', {
                    rpcTransactionId,
                    commit: true,
                });
            }
            //await queryRunner.commitTransaction();
        }
        catch (error) {
            //await queryRunner.rollbackTransaction();
            if (isCaller) {
                global['moleculer:broker'].broadcast('moleculer.transaction', {
                    rpcTransactionId,
                    commit: false,
                });
            }
            console.log(error);
            console.log(error.stack);
            throw new core_1.CoolCommException(error.message);
        }
        return data;
    };
    return descriptor;
};

我改了这个压测了一下,是可以的。麻烦尽快update 一下吗?