ChunelFeng / CGraph

【A common used C++ DAG framework】 一个通用的、无三方依赖的、跨平台的、收录于awesome-cpp的、基于流图的并行计算框架。欢迎star & fork & 交流
http://www.chunel.cn
MIT License
1.76k stars 321 forks source link

ishold获取param有概率段错误 #290

Closed niguang1 closed 10 months ago

niguang1 commented 11 months ago

现象:
和hold节点并行的节点返回异常状态时,hold节点获取param有概率段错误。 示例代码:

#include "CGraph.h"

using namespace CGraph;

class MyParam : public GParam {
public:
    /**
     * reset方法,在pipeline执行一次结束的时候被调用。
     * 如果是pipeline多次执行,并且依赖之前pipeline运行的结果(如,通过iCount值,记录pipeline执行了多少次)
     * reset中,不要实现重置参数逻辑即可
     */
    CVOID reset(const CStatus& curStatus) override {
        iValue = 0; // pipeline 执行一次结束后,iValue值清0;iCount值保留,可带入下一次pipeline执行
    }
    CStatus setup() {
        return CStatus();
    }

    int iValue{0};
};

class MyHoldNode : public GNode {
public:
    CStatus init() override {
        CStatus status;
        status = createGParam<MyParam>("myparam");
        return status;
    }
    CStatus run() override {
        auto myParam = getGParam<MyParam>("myparam");
        if (myParam == nullptr) return CStatus(STATUS_ERR,"");

        if (myParam) {
            CGRAPH_PARAM_WRITE_CODE_BLOCK(myParam);
            myParam->iValue++;
            std::cout << "iCount value is [" << myParam->iValue << "]" << std::endl;
        }

        return CStatus();
    }
    CBOOL isHold() override {
        auto myParam = getGParam<MyParam>("myparam");
        if (nullptr == myParam) {
            return false; // 如果未读取到参数,则直接结束,不继续执行了
        }

        // return (myParam->iValue < 5); // 当 iValue 值小于5的时候,此节点会持续执行
        return true;
    }
};

class MyNode1 : public CGraph::GNode {
public:
    CStatus run () override {
        std::cout << "my node run " << getName() << std::endl;
        return CStatus(STATUS_ERR,"");
    }
};

void tutorial_hold() {
    GPipelinePtr pipeline = GPipelineFactory::create();
    GElementPtr holdNode, node1 = nullptr;

    pipeline->registerGElement<MyHoldNode>(&holdNode, {}, "myHold");    // 注册了一个实现了hold方法的节点
    pipeline->registerGElement<MyNode1>(&node1, {}, "node1");

    pipeline->process(1);    // 运行pipeline
    GPipelineFactory::remove(pipeline);
}

int main() {
    tutorial_hold();
    return 0;
}

测试脚本

#!/bin/bash

count=0
while [ $count -lt 500000 ] ; do
        ./build/tutorial/T14-Hold > /dev/null 2>&1
        if [ $? -ne 0 ]; then
                echo "++++++++++++++++++++$count"
                exit
        fi
        count=$(($count+1))
        sleep 1
done
ChunelFeng commented 11 months ago

抱歉,今天有很多事情,忙到刚才。我明天会抽空尝试复现这个问题。

请稍微详细描述一下具体,比如,您使用的是什么时候的版本, 在什么环境下运行,什么概率出现所述问题,

能否定位到崩溃位置?

niguang1 commented 11 months ago

代码版本:0ce3e01cf21d9e3ba0bb98cdc6a2176bfdb0a7ab 测试环境ubuntu 22.04,kernel版本5.14.0-1042-oem 对应复现代码有更新(使用以下代码替换原工程中CGraph/tutorial/T14-Hold.cpp即可):

#include "CGraph.h"

using namespace CGraph;

class MyParam : public GParam {
public:
    /**
     * reset方法,在pipeline执行一次结束的时候被调用。
     * 如果是pipeline多次执行,并且依赖之前pipeline运行的结果(如,通过iCount值,记录pipeline执行了多少次)
     * reset中,不要实现重置参数逻辑即可
     */
    CVoid reset(const CStatus& curStatus) override {
        iValue = 0; // pipeline 执行一次结束后,iValue值清0;iCount值保留,可带入下一次pipeline执行
    }
    CStatus setup() {
        return CStatus();
    }

    int iValue{0};
};

class MyHoldNode : public GNode {
public:
    CStatus init() override {
        CStatus status;
        status = createGParam<MyParam>("myparam");
        return status;
    }
    CStatus run() override {
        auto myParam = getGParam<MyParam>("myparam");
        if (myParam == nullptr) return CStatus(CGraph::internal::STATUS_ERR,"");

        if (myParam) {
            CGRAPH_PARAM_WRITE_CODE_BLOCK(myParam);
            myParam->iValue++;
            std::cout << "iCount value is [" << myParam->iValue << "]" << std::endl;
        }

        return CStatus();
    }
    CBool isHold() override {
        auto myParam = getGParam<MyParam>("myparam");
        if (nullptr == myParam) {
            return false; // 如果未读取到参数,则直接结束,不继续执行了
        }

        // return (myParam->iValue < 5); // 当 iValue 值小于5的时候,此节点会持续执行
        return true;
    }
};

class MyNode1 : public CGraph::GNode {
public:
    CStatus run () override {
        std::cout << "my node run " << getName() << std::endl;
        return CStatus(CGraph::internal::STATUS_ERR,"");
    }
};

void tutorial_hold() {
    GPipelinePtr pipeline = GPipelineFactory::create();
    GElementPtr holdNode, node1 = nullptr;

    pipeline->registerGElement<MyHoldNode>(&holdNode, {}, "myHold");    // 注册了一个实现了hold方法的节点
    pipeline->registerGElement<MyNode1>(&node1, {}, "node1");

    pipeline->process(1);    // 运行pipeline
    GPipelineFactory::remove(pipeline);
}

int main() {
    tutorial_hold();
    return 0;
}

使用test.sh脚本大概十次内必现,手动运行大概二十次内必现 调用堆栈信息:

libstdc++.so.6!__dynamic_cast (未知源:0)
CGraph::GParamManager::get<MyParam, 0>(const std::string & key, CGraph::GParamManager * const this) (CGraph/src/GraphCtrl/GraphParam/GParamManager.inl:45)
CGraph::GElement::getGParam<MyParam, 0>(const std::string & key) (CGraph/src/GraphCtrl/GraphElement/GElement.h:423)
MyHoldNode::isHold(MyHoldNode * const this) (CGraph/tutorial/T14-Hold.cpp:49)
CGraph::GElement::fatProcessor(CGraph::GElement * const this, const CGraph::CFunctionType & type) (CGraph/src/GraphCtrl/GraphElement/GElement.cpp:242)
operator()(const struct {...} * const __closure) (CGraph/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.cpp:89)
std::__invoke_impl<void, CGraph::GDynamicEngine::process(CGraph::GElementPtr, CBool)::<lambda()>&>(struct {...} & __f) (/usr/include/c++/11/bits/invoke.h:61)
std::__invoke_r<void, CGraph::GDynamicEngine::process(CGraph::GElementPtr, CBool)::<lambda()>&>(struct {...} & __fn) (/usr/include/c++/11/bits/invoke.h:154)
operator()() (/usr/include/c++/11/future:1468)
std::__future_base::_Task_setter<std::unique_ptr<std::__future_base::_Result<void>, std::__future_base::_Result_base::_Deleter>, std::__future_base::_Task_state<CGraph::GDynamicEngine::process(CGraph::GElementPtr, CBool)::<lambda()>, std::allocator<int>, void()>::_M_run()::<lambda()>, void>::operator()() (/usr/include/c++/11/future:1409)
std::__invoke_impl<std::unique_ptr<std::__future_base::_Result<void>, std::__future_base::_Result_base::_Deleter>, std::__future_base::_Task_setter<std::unique_ptr<std::__future_base::_Result<void>, std::__future_base::_Result_base::_Deleter>, std::__future_base::_Task_state<CGraph::GDynamicEngine::process(CGraph::GElementPtr, CBool)::<lambda()>, std::allocator<int>, void()>::_M_run()::<lambda()>, void>&>() (/usr/include/c++/11/bits/invoke.h:61)
std::__invoke_r<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter>, std::__future_base::_Task_setter<std::unique_ptr<std::__future_base::_Result<void>, std::__future_base::_Result_base::_Deleter>, std::__future_base::_Task_state<CGraph::GDynamicEngine::process(CGraph::GElementPtr, CBool)::<lambda()>, std::allocator<int>, void()>::_M_run()::<lambda()>, void>&>() (/usr/include/c++/11/bits/invoke.h:143)
std::_Function_handler<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter>(), std::__future_base::_Task_setter<std::unique_ptr<std::__future_base::_Result<void>, std::__future_base::_Result_base::_Deleter>, std::__future_base::_Task_state<CGraph::GDynamicEngine::process(CGraph::GElementPtr, CBool)::<lambda()>, std::allocator<int>, void()>::_M_run()::<lambda()>, void> >::_M_invoke(const std::_Any_data &)(const std::_Any_data & __functor) (/usr/include/c++/11/bits/std_function.h:291)
std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>::operator()() const(const std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter>()> * const this) (/usr/include/c++/11/bits/std_function.h:590)
std::__future_base::_State_baseV2::_M_do_set(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>*, bool*)(std::__future_base::_State_baseV2 * const this, std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter>()> * __f, bool * __did_set) (/usr/include/c++/11/future:571)
libc.so.6!__pthread_once_slow(pthread_once_t * once_control, void (*)(void) init_routine) (pthread_once.c:116)
__gthread_once(__gthread_once_t * __once) (/usr/include/x86_64-linux-gnu/c++/11/bits/gthr-default.h:700)
std::call_once<void (std::__future_base::_State_baseV2::*)(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>*, bool*), std::__future_base::_State_baseV2*, std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>*, bool*>(std::once_flag&, void (std::__future_base::_State_baseV2::*&&)(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>*, bool*), std::__future_base::_State_baseV2*&&, std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>*&&, bool*&&)(void (std::__future_base::_State_baseV2::*&&)(std::__future_base::_State_baseV2 * const, std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter>()> *, bool *) __f, std::once_flag & __once) (/usr/include/c++/11/mutex:783)
std::__future_base::_State_baseV2::_M_set_result(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>, bool)(bool __ignore_failure, std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter>()> __res, std::__future_base::_State_baseV2 * const this) (/usr/include/c++/11/future:411)
std::__future_base::_Task_state<CGraph::GDynamicEngine::process(CGraph::GElementPtr, CBool)::<lambda()>, std::allocator<int>, void()>::_M_run(std::__future_base::_Task_state<CGraph::GDynamicEngine::process(CGraph::GElementPtr, CBool)::<lambda()>, std::allocator<int>, void()> * const this) (/usr/include/c++/11/future:1471)
ChunelFeng commented 11 months ago

你好,我们在本地复现了一下这个问题。初步定位了这个问题产生的原因,给出几点建议:

  1. isHold() 函数用法错误。这个函数,目的是为了确定当前 node 的结束条件。返回 false的情况下,结束本地run()方法,否则继续执行。没有特殊逻辑的情况下,均推荐return false,强烈不建议默认情况下 return true。

  2. node 执行过程中,仅建议在异常的情况下,返回 CStatus("Error info"),一般都建议返回正确的值。如果逻辑中,真的有非常多的异常情况,建议您通过 专门设定一个GParam 的方式,来记录对应的错误信息。

  3. 如果不考虑做以上链路改造的话,请切换到当前最新版本,并且在 process() 执行之前,将执行引擎修改为静态。

    void tutorial_hold() {
    GPipelinePtr pipeline = GPipelineFactory::create();
    GElementPtr holdNode, node1 = nullptr;
    
    pipeline->registerGElement<MyHoldNode>(&holdNode, {}, "myHold");
    pipeline->registerGElement<MyNode1>(&node1, {}, "node1");
    pipeline->setGEngineType(GEngineType::STATIC);    // 加上这句话
    
    pipeline->process(1);
    GPipelineFactory::remove(pipeline);
    }

    修改为 静态引擎之后,我自测是ok的。

如果有什么问题的话,或者有使用需求的话,欢迎添加我个人微信(ChunelFeng),随时交流

niguang1 commented 11 months ago

经测试,即使是ishold根据param实际值决定是否hold的情况下,只要不设置引擎为静态,仍然出现崩溃现象。 另外,可否概述一下引擎设置为静态后即使ishold固定返回true,也不会崩溃的原因?

ChunelFeng commented 11 months ago

请尝试升级到最新的 main分支版本,不需要设定静态引擎了。如下是我自测的例子:

image

btw,本周六(2023.11.25)晚上八点,我会在开源社区通过tx会议的形式,跟大家分享动态和静态引擎的执行逻辑, 有兴趣的话,可以添加我微信,到时候一起交流讨论。

ChunelFeng commented 11 months ago

sorry, 稍微多测了几次,动态执行的时候,还是有崩溃的情况,我们会继续定位的。 暂时推荐使用 静态引擎,或者减少返回 CStatus(error) 的情况