sogou / srpc

RPC framework based on C++ Workflow. Supports SRPC, Baidu bRPC, Tencent tRPC, thrift protocols.
Apache License 2.0
1.93k stars 382 forks source link

srpc和workflow配合使用问题 #358

Closed Anonymouszj closed 5 months ago

Anonymouszj commented 7 months ago

现在有一个任务,分两个步骤: 1.对客户端传输过来的InitRequest数据做处理,服务端返回Init线程号。(一次程序运行只做一次Init) 2.在客户端发送ForwardRequest,其中ForwardRequest包含Init返回的线程号。(Init之后可以多次做Forward)

第一种做法: 服务端:

class RemoteServiceImpl : public Remote::Service
{
public:
    void Init(InitRequest *request, InitResponse *response, srpc::RPCContext *ctx) override
    {

        bool encrtyption = request->encrtyption();
        int device = request->device();
        bool use_gpu = request->use_gpu();
        //  逻辑...
        response->set_tid(gettid());
        response->set_ret(ret);
    }

    void Forward(ForwardRequest *request, ForwardResponse *response, srpc::RPCContext *ctx) override
    {
        request->tid();
        // 逻辑...
        response->set_ret(ret);
    }
}

客户端:

    InitRequest init_req;
    init_req.set_encrtyption(encrtyption);
    init_req.set_device(device);
    init_req.set_use_gpu(use_gpu);
    client.Init(&init_req, init_done); // init_done函数中 tid = response->tid();

    ForwardRequest forward_req;
    forward_req.set_tid(tid);
    client.Forward(&forward_req, forward_done);

但是这个tid给不到forward_req.set_tid() 中去。

第二种做法:

服务端:

using InitTask = WFThreadTask<InitInput, InitOutput>;
using InitFactory = WFThreadTaskFactory<InitInput, InitOutput>;

using ForwardTask = WFThreadTask<ForwardInput, ForwardOutput>;
using ForwardFactory = WFThreadTaskFactory<ForwardInput, ForwardOutput>;

void init_callback(InitTask *task)
{
    auto *input = task->get_input();
    auto *output = task->get_output();
    assert(task->get_state() == WFT_STATE_SUCCESS);
}

void forward_callback(ForwardTask *task)
{
    auto *input = task->get_input();
    auto *output = task->get_output();
    assert(task->get_state() == WFT_STATE_SUCCESS);
}

class RemoteServiceImpl : public Remote::Service
{
public:
    void Init(InitRequest *request, InitResponse *response, srpc::RPCContext *ctx) override
    {
        InitTask *task = InitFactory::create_thread_task("init", init_routine, init_callback);

        InitInput *input = task->get_input();

        input->encrtyption = request->encrtyption();
        input->device = request->device();
        input->use_gpu = request->use_gpu();

        ctx->get_series()->push_back(task);
    }

    void Forward(ForwardRequest *request, ForwardResponse *response, srpc::RPCContext *ctx) override
    {
        ForwardTask *task = ForwardFactory::create_thread_task("forward", forward_routine, forward_callback);

        ForwardInput *input = task->get_input();

        input->tid = request->tid();

        ctx->get_series()->push_back(task);
    }
};

客户端:

    WFFacilities::WaitGroup wait_group(1);
    SeriesWork *series = Workflow::create_series_work(inittask, [&wait_group](const SeriesWork *)
                                                      { wait_group.done(); });
    series->push_back(forwardtask);
    series->start();

遇见在服务端不能把init_routine的output赋给InitResponse进行返回。

Barenboim commented 7 months ago

我觉的你代码写的很标准,看起来比较熟悉我们的用法了。 但是,你对遇到的问题的描述太简略了,比如:”但是这个tid给不到forward_req.set_tid() 中去。”,我没有理解你说的是什么问题。client得到Init的tid,然后吧这个tid传到server的forward函数,但server收不到吗?

Barenboim commented 7 months ago

第二部分的代码,需要你在init_callback里吧output结果赋response,你代码里没有包括这块的内容。我不知道你是略过了,还是你确实没有写。你的代码结构没有问题,不过,除非你init_routine和forward_routine这两个函数计算量比较大,否则没有必要启动计算任务,直接在函数里写就可以了。

Anonymouszj commented 7 months ago

我觉得你的代码写的但是很标准,看起来比较熟悉我们的用法了。 ,你对遇到的问题的描述太简略了,比如:”但是这个tid给不到forward_req.set_tid()中去。 ”,我没明白你说的是什么问题。client得到Init的tid,然后把这个tid传到server的forward函数,但是server收不到吗?

首先感谢回答。 tid是服务端返回给客服端的线程号,在客户端运行时把这个线程号赋值给ForwardRequest发送给客户端的时候tid一直是默认值, 思考了下是Init和Forwad函数是同时启动的两个线程,所以我在中间加了等待,目前ForwardRequest是可以正确得到tid了。 我使用第一种方法:

InitRequest init_req;
init_req.set_encrtyption(encrtyption);
init_req.set_device(device);
init_req.set_use_gpu(use_gpu);
client.Init(&init_req, init_done); // init_done函数中 tid = response->tid();

wait_group.wait(10000);

ForwardRequest forward_req;
forward_req.set_tid(tid);
client.Forward(&forward_req, forward_done);

现在又一个问题,如果我打算一直运行Forward,但是每次发送的数据都不一样,我试了直接写到循环里边; 类似如下 :

while (1)
{
    ForwardRequest forward_req;
    forward_req.set_tid(tid);
    client.Forward(&forward_req, forward_done);
}

发现好像每次执行的Forward函数都是不同线程。 如何让Forward运行的线程保持不变,改变的只是ForwardRequest。

Barenboim commented 7 months ago

你提问的时候,更好的方法是说明一下你的原始需求,然后我们想解决方案,而不是直接发你的方法和使用这个方法遇到的问题。

我现在只能猜测你的需求。你是希望client在Init的时候得到一个tid,每次请求forward时,把tid发给server,让server在这个tid的线程上执行计算,是这个意思吗?

Barenboim commented 7 months ago

可能你更原始的需求是,你希望同一client的请求都在同一个线程上执行,因为你有一些thread local的变量?

Anonymouszj commented 7 months ago

你提问的时候,更好的方法是说明一下你的原始需求,然后我们想解决方案,而不是直接发你的方法和使用这个方法遇到的问题。

我现在只能猜测你的需求。你是希望client在Init的时候得到一个tid,每次请求forward时,把tid发给server,让server在这个tid的线程上执行计算,是这个意思吗?

不好意思哈 需求是: 1.客户端发送模型到服务端进行模型初始化得到句柄。(全局只做一次) 2.客服端发送得到的句柄和图片到服务端进行模型推理的到结果。 3.只有两个线程,模型初始化一个线程,模型推理一个线程。

Barenboim commented 7 months ago

第三点只有两个线程,是你自己创建的线程吗?我们的框架本身就是多线程的,而且也无法保障server同一个函数在同一个线程。计算任务,也不能指定线程。你的意思是不是只有两个服务?

Barenboim commented 7 months ago

你是希望forward和init这两个函数都在固定的线程上执行吗?这么做的目的是什么,是不是有thread local的数据,或者是想串行执行?

Anonymouszj commented 7 months ago

第三点只有两个线程,是你自己创建的线程吗?我们的框架本身就是多线程的,而且也无法保障server同一个函数在同一个线程。计算任务,也不能指定线程。你的意思是不是只有两个服务?

不是我创建的,是依赖框架。 是的,只有两个服务。

Barenboim commented 7 months ago

第三点只有两个线程,是你自己创建的线程吗?我们的框架本身就是多线程的,而且也无法保障server同一个函数在同一个线程。计算任务,也不能指定线程。你的意思是不是只有两个服务?

不是我创建的,是依赖框架。 是的,只有两个服务。

所以,你希望这两个服务分别只有一个线程处理是吗?那么,是因为同一种服务必须只能一个线程处理,还是说,你只是希望每种服务都是串行的执行?

Anonymouszj commented 7 months ago

第三点只有两个线程,是你自己创建的线程吗?我们的框架本身就是多线程的,而且也无法保障server同一个函数在同一个线程。计算任务,也不能指定线程。你的意思是不是只有两个服务?

不是我创建的,是依赖框架。 是的,只有两个服务。

所以,你希望这两个服务分别只有一个线程处理是吗?那么,是因为同一种服务必须只能一个线程处理,还是说,你只是希望每种服务都是串行的执行?

希望每种服务都是串行的执行

Barenboim commented 7 months ago

好的,方法很多。。。所以,你下次提问的话,只需要说SRPC怎么让同一种服务串行执行就可以😂 我给你几个比较好的办法。

Anonymouszj commented 7 months ago

好的,方法很多。。。所以,你下次提问的话,只需要说SRPC怎么让同一种服务串行执行就可以😂 我给你几个比较好的办法。

我不是做这方面的,对这些一点都不懂,硬是看了好几天文档和代码

Barenboim commented 7 months ago

如果使用计算线程,可以这样:

#include <workflow/WFResourcePool.h>

class RemoteServiceImpl : public Remote::Service
{
public:
      RemoteServiceImpl() : respool_init(1), respool_forward(1) { } 
private:
      WFResoucePool respool_init;
      WFResourcePool respool_forward;
public:
    void Init(InitRequest *request, InitResponse *response, srpc::RPCContext *ctx) override
    {
        InitTask *task = InitFactory::create_thread_task("init", init_routine, init_callback);

        InitInput *input = task->get_input();

        input->encrtyption = request->encrtyption();
        input->device = request->device();
        input->use_gpu = request->use_gpu();
                WFConditional *cond = respool_init.get(task);

        ctx->get_series()->push_back(cond);
    }
}

我们使用只有一个资源的resource pool,当任务需要执行是,必须从resource pool里拿到资源。在init_callback以及forward_callback里,填写response并post会一个资源,让下一个任务可以被调起。关于资源池的文档,可以参考: https://github.com/sogou/workflow/blob/master/docs/about-resource-pool.md

Barenboim commented 7 months ago

好的,方法很多。。。所以,你下次提问的话,只需要说SRPC怎么让同一种服务串行执行就可以😂 我给你几个比较好的办法。

我不是做这方面的,对这些一点都不懂,硬是看了好几天文档和代码

不是不是,你代码写的很溜了,对我们的用法理解的很好。我只是说提问题的方式,应该说明原始问题。

Barenboim commented 7 months ago

上面的用法调用了计算线程。其实你也可以用一个时间为0的定时器,在定时器的回调里执行你的逻辑,同样使用resource pool把定时器任务转换成一个WFConditional。

Anonymouszj commented 7 months ago

上面的用法调用了计算线程。其实你也可以用一个时间为0的定时器,在定时器的回调里执行你的逻辑,同样使用resource pool把定时器任务转换成一个WFConditional。

万分感谢。我在研究研究

Barenboim commented 7 months ago

嗯嗯,有问题随时交流!也麻烦star一下项目啊。

Anonymouszj commented 7 months ago

嗯嗯,有问题随时交流!也麻烦star一下项目啊。

okk