sogou / workflow

C++ Parallel Computing and Asynchronous Networking Framework
Apache License 2.0
13.01k stars 2.41k forks source link

机器学习场景的使用的最佳实践是什么(实用案例推荐) #935

Open rockyzhengwu opened 2 years ago

rockyzhengwu commented 2 years ago

场景

服务端是一个 PyTorch C++ 实现的 CTR 预估的服务,预估之前需要去 KV(类似 redis) 里读取特征作为 CTR 预估的输入。一个客户端请求可能读一次,也可能读两次 KV 。

详细流程可能是这样: 拿到请求体 -> 解析 -> 读 kv -> 产生模型输入 -> CTR 模型 -> calibration 模型 -> 其他逻辑 -> 返回 。

现在的实现

看了下 demo 然后就写了大概这样的逻辑

static WFFacilities::WaitGroup wait_group(1);

void sig_handler(int signo) { wait_group.done(); }

int main(int argc, char *argv[]) {
  // ....   something init

  unsigned short port = 8083;
  signal(SIGINT, sig_handler);
  signal(SIGTERM, sig_handler);
  WFHttpServer server(process);
  if (server.start(port) == 0) {
    wait_group.wait();
    server.stop();
  } else {
    perror("Cannot start server");
    exit(1);
  }
  return 0;
}

void process(WFHttpTask *server_task) {
  protocol::HttpRequest *req = server_task->get_req();
  protocol::HttpResponse *resp = server_task->get_resp();
  long long seq = server_task->get_task_seq();

  resp->set_http_version("HTTP/1.1");
  resp->set_status_code("200");

  std::string body;
  auto uri = req->get_request_uri();
  if (std::strcmp(uri, "/cvr") == 0) {
    const void *body;
    size_t size;
    req->get_parsed_body(&body, &size);
    std::string req_body = static_cast<const char *>(body);
    std::string response_body = "";
    MODL_MANAGER->predict(std::move(req_body),  response_body);
    resp->append_output_body(response_body.data(), response_body.size());
  } else {
    resp->set_status_code("404");
  }
}

遇到的问题

由于在 MODL_MANAGER->predict(std::move(req_body), response_body); 里面实现了等待读取 KV 的逻辑和模型预估的部分。但读 KV 有可能会卡住(看了很久还不是很理解这个问题)。 想不占线程的读 KV,不知道怎么实现合适。一番讨论发现从我开始用 workflow 的时候就好像用错了,似乎在最开始就应该把逻辑分成不同的 task 。所以想是不是有什么最佳的实践。

补充一下, workflow 线上用了一年多了,在我要读 kv 之前都没任何问题, 50 ms 的预估时间,之前尝试过很多都不能很好的解决超时, workflow 可以..... 目前 p99 30ms cpu 打满基本不超时。

Barenboim commented 2 years ago

你好。这个里面好几个问题。我一个个回复。 首先,如果你的整个predict过程是一个纯计算,那么,最好的实现是吧predict包装成一个计算任务,并且push_back到server task所在的series。这么做比直接在process里计算要好一些,可以让计算运行在计算线程,而不是占用网络线程。示例:

void predict_fn(const void *body, size_t size, std::string *response_body)
{
    ....
}

void process(WFHttpTask *task)
{
    void *body;
    size_t size;

    task->get_req()->get_parsed_body(&body, &size);
    std::string *response_body = new std::string;
    task->user_data = response_body;
    task->set_callback([](WFHttpTask *task) { delete (std::string *)task->user_data; });

    WFGoTask *predict_task = WFTaskFactory::create_go_task("predict", predict_fn, body, size, response_body);
    predict_task->set_callback([task](WFGoTask *predict_task) {
        std::string *response_body = (std::string *)task->user_data;
        task->get_resp()->append_output_body_nocopy(response_body->c_str(), response_body->size());
    });
    series_of(task)->push_back(predict_task);
}

很多用户误认为process函数结束就是服务处理流程结束了。其实,服务处理流程是series而不是process函数,否者,我们无法实现全异步的server了。这个示例,我们产生一个计算任务(predict_task)并添加到series,在计算任务的callback里填写了resp。由于想要最佳实践,这里用了append_output_body_nocopy,并且在server task的callback里释放response_body。 这里就涉及了多个调用的时机。predict_fn调用显然不在process函数线程里,而是process函数结束之后,在某个计算线程里被调用。而predict_task的callback在predict_fn之后在同一个线程里被执行,我们在这里填写resp,并且使用nocopy接口,因为response_body在回复结束之前,都还没有被delete。 还有一个函数,就是server task的callback。server task的callback是回复结束之后被调用(这和http client task一致,都是http交互完成调用callback),在这里我们释放了response_body。 当然,如果不那么要求性能,这个实现可以简单一些。比如不用_nocopy接口。

Barenboim commented 2 years ago

第二个问题,你需要在计算中插入通讯。我不知道你的kv是什么协议的通讯,我们就假设是redis(自定义协议的话可以自己实现,参考相关文档)。如果"解析"部分计算量不大,直接在process里算就可以了。我们假设这个计算量确实不大,那么,现在就是先访问redis再predict计算,然后填写回复。可以这么实现:

void predict_fn(const void *body, size_t size, std::string *response_body)
{
    ....
}

void redis_callback(WFRedisTask *task)
{ 
    protocol::RedisResponse *resp = task->get_resp();
    // read body and size from resp.

    WFGoTask *predict_task = WFTaskFactory::create_go_task("predict", predict_fn, body, size, response_body);
    predict_task->set_callback([](WFGoTask *predict_task) {
        WFHttpTask *server_task = (WFHttpTask *)series_of(predict_task)->get_context(); // 取回server task
        std::string *response_body = (std::string *)server_task->user_data;
        server_task->get_resp()->append_output_body_nocopy(response_body->c_str(), response_body->size());
    });

    series_of(task)->push_back(predict_task); // 把计算任务放进series
}

void process(WFHttpTask *task)
{
    SeriesWork *series = series_of(task);

    std::string *response_body = new std::string;
    task->user_data = response_body;
    task->set_callback([](WFHttpTask *task) { delete (std::string *)task->user_data; });

    WFRedisTask *redis_task = WFTaskFactory::create_redis_task(url, 0, redis_callback);
    redis_task->get_req()->set_request(....);

    series->set_context(task);    // 把server_task指针放在series上下文
    series->push_back(redis_task);
}

这个就构成了一个网络任务+计算任务的处理流程。整个过程无需等待,也没有占有网络线程进行复杂的计算。如果在redis任务之前,还需要进行比较复杂的计算,同理增加一个计算任务即可。 但是,我们发现,如果处理流程过于复杂,你就不的不让每个任务callback里发起下一步操作。这会让代码很零碎。特别是,当A任务是功能X的最后一个任务,任务B是功能Y的第一个任务。因为X功能之后执行Y,就需要让A的callback去创建B,这是很不合理的。那么,对于这种复杂的需求,就可以使用我们的模块任务。

Barenboim commented 2 years ago

当若干个任务完成一个特定的功能,就可以使用模块任务了。例如,我们认为redis和predict构成一个完整的功能,那么上例中最不合理的就是让predict_task的callback去填写response。用模块可以这样改造:

void predict_fn(const void *body, size_t size, std::string *response_body)
{
    ....
}

void redis_callback(WFRedisTask *task)
{ 
    protocol::RedisResponse *resp = task->get_resp();
    // read body and size from resp.

    std::string *response_body = new std::string;
    WFGoTask *predict_task = WFTaskFactory::create_go_task("predict", predict_fn, body, size, response_body);
    // 不再需要给predict_task设置callback了。耦合性降低。计算结果放在series context。
    series_of(task)->set_context(response_body);
    series_of(task)->push_back(predict_task); // 把计算任务放进series。这个series其实是module内的sub_series
}

void process(WFHttpTask *task)
{
    WFRedisTask *redis_task = WFTaskFactory::create_redis_task(url, 0, redis_callback);
    WFModuleTask *module = WFTaskFactory::create_module_task(redis_task, [task](WFModuleTask *mod) {
        std::string *response_body = mod->sub_series()->get_context();  // 这里是sub_series不是series_of
        task->user_data = response_body;
        task->set_callback([](WFHttpTask *task) { delete (std::string *)task->user_data; });
        task->get_resp()->append_output_body_nocopy(response_body->c_str(), response_body->size());
    });
    series_of(task)->push_back(module);
}

可以看一下module的文档:https://github.com/sogou/workflow/blob/master/docs/about-module.md

Barenboim commented 2 years ago

总结

rockyzhengwu commented 2 years ago

@Barenboim 明白了,感谢这么优秀的项目,特别耐心的解答

MaybeShewill-CV commented 2 years ago

@rockyzhengwu 我之前也遇到过类似的需求,如果模型处理时间超过一定范围就进行其他的处理逻辑。我想到的解决思路是首先设置一个counter任务当作塞子用来唤醒后续的任务流,接着在模型计算任务函数中设置一个定时任务(定时时间为超时时间),然后在模型计算任务的callback函数和定时任务的callback函数中分别count之前的counter,这样无论是计算函数先完成还是定时任务先完成均可以顺利进入counter的callback函数。在counter的callback函数中push back你需要的后续流程就可以了。

大致的代码差不多长这个样子,不知道能不能解决你的问题哈:)

void do_segmentation() {
    auto segmentation_timeout_timer = WFTaskFactory::create_timer_task(20, 0,[&](const WFTimerTask* task) {
        LOG(INFO) << "segmentation task timeout at:" << Timestamp::now().to_format_str();
        WFTaskFactory::count_by_name("count");
    });
    segmentation_timeout_timer->start();
    LOG(INFO) << "start doing segmentation: " << Timestamp::now().to_format_str();
    std::this_thread::sleep_for(std::chrono::seconds(25));
}

int main(int argc, char** argv) {

    WFFacilities::WaitGroup wait_group(1);
    auto* series = Workflow::create_series_work(
            WFTaskFactory::create_empty_task(),
            [&](const SeriesWork* work){
                LOG(INFO) << "Series complete at: " << Timestamp::now().to_format_str();
            });

    WFTimerTask* get_image_data = WFTaskFactory::create_timer_task(
            1000000 * 5, [&](const WFTimerTask* task) {
                LOG(INFO) << "successfully get image data at: " << Timestamp::now().to_format_str();
            });
    WFCounterTask *counter = WFTaskFactory::create_counter_task("count", 1, [&](const WFCounterTask* task) {
        LOG(INFO) << "counter task complete at: " << Timestamp::now().to_format_str();
        auto* postprocess_work = WFTaskFactory::create_go_task("postprocess", [&]() {
            LOG(INFO) << "do postprocess task at: " << Timestamp::now().to_format_str();
        });
        *series_of(task) << postprocess_work;
    });
    counter->start();

    auto* segmentaiton_work = WFTaskFactory::create_go_task("do_segmentation", do_segmentation);
    segmentaiton_work->set_callback([&](const WFGoTask* task) {
        LOG(INFO) << "complete segmentation at:" << Timestamp::now().to_format_str();
        WFTaskFactory::count_by_name("count");
    });

    *series << get_image_data;
    *series << segmentaiton_work;
    series->start();

    LOG(INFO) << "start whole process at: " << Timestamp::now().to_format_str();
    wait_group.wait();
    return 1;
}

@Barenboim 也请大佬帮忙看看这样写对不对,或者有没有更好的解决方案。

还有个小疑问,就是更加理想的状态是不应该在超时之后就取消计算任务,但是我查了现在我的文档,好像一旦start的任务是不能cancle的,这样如果我的计算任务一直卡死的话,这个线程是不是就被一直占用,浪费了。

Barenboim commented 2 years ago

先回答小疑问。运行中的计算任务是你的一个函数,显然框架无法中断你执行中的函数。这个我们也许可以考虑搞一个进程任务(感觉有点像actor模式了),时间到了直接kill听起来很刺激,起码是个理论上可行的方案。 我们内部有workflow的低代码框架,就是可以通过一个配置自动生成workflow代码。这里可以给计算任务设置超时或中断。但这个肯定需要在函数里加入检查点,让函数可以中途自行返回。代码写成死循环是没有办法的。 actor模式可以想一下。

MaybeShewill-CV commented 2 years ago

@Barenboim 感谢解答。下来去了解下actor哈:)

Barenboim commented 2 years ago

@rockyzhengwu 我之前也遇到过类似的需求,如果模型处理时间超过一定范围就进行其他的处理逻辑。我想到的解决思路是首先设置一个counter任务当作塞子用来唤醒后续的任务流,接着在模型计算任务函数中设置一个定时任务(定时时间为超时时间),然后在模型计算任务的callback函数和定时任务的callback函数中分别count之前的counter,这样无论是计算函数先完成还是定时任务先完成均可以顺利进入counter的callback函数。在counter的callback函数中push back你需要的后续流程就可以了。

大致的代码差不多长这个样子,不知道能不能解决你的问题哈:)

void do_segmentation() {
    auto segmentation_timeout_timer = WFTaskFactory::create_timer_task(20, 0,[&](const WFTimerTask* task) {
        LOG(INFO) << "segmentation task timeout at:" << Timestamp::now().to_format_str();
        WFTaskFactory::count_by_name("count");
    });
    segmentation_timeout_timer->start();
    LOG(INFO) << "start doing segmentation: " << Timestamp::now().to_format_str();
    std::this_thread::sleep_for(std::chrono::seconds(25));
}

int main(int argc, char** argv) {

    WFFacilities::WaitGroup wait_group(1);
    auto* series = Workflow::create_series_work(
            WFTaskFactory::create_empty_task(),
            [&](const SeriesWork* work){
                LOG(INFO) << "Series complete at: " << Timestamp::now().to_format_str();
            });

    WFTimerTask* get_image_data = WFTaskFactory::create_timer_task(
            1000000 * 5, [&](const WFTimerTask* task) {
                LOG(INFO) << "successfully get image data at: " << Timestamp::now().to_format_str();
            });
    WFCounterTask *counter = WFTaskFactory::create_counter_task("count", 1, [&](const WFCounterTask* task) {
        LOG(INFO) << "counter task complete at: " << Timestamp::now().to_format_str();
        auto* postprocess_work = WFTaskFactory::create_go_task("postprocess", [&]() {
            LOG(INFO) << "do postprocess task at: " << Timestamp::now().to_format_str();
        });
        *series_of(task) << postprocess_work;
    });
    counter->start();

    auto* segmentaiton_work = WFTaskFactory::create_go_task("do_segmentation", do_segmentation);
    segmentaiton_work->set_callback([&](const WFGoTask* task) {
        LOG(INFO) << "complete segmentation at:" << Timestamp::now().to_format_str();
        WFTaskFactory::count_by_name("count");
    });

    *series << get_image_data;
    *series << segmentaiton_work;
    series->start();

    LOG(INFO) << "start whole process at: " << Timestamp::now().to_format_str();
    wait_group.wait();
    return 1;
}

@Barenboim 也请大佬帮忙看看这样写对不对,或者有没有更好的解决方案。

还有个小疑问,就是更加理想的状态是不应该在超时之后就取消计算任务,但是我查了现在我的文档,好像一旦start的任务是不能cancle的,这样如果我的计算任务一直卡死的话,这个线程是不是就被一直占用,浪费了。

你这个程序写太乱了,有点看不懂。一般就是通过一个目标值为1的counter,如果上一个计算完成,或timer到期都count一下,让流程可以继续。 感觉你这个demo里,用了太多的timer和sleep来模拟读库和计算,很难一眼看出哪里是用来控制超时,哪里是用来模拟读库的延迟,只能说不是一个好的demo。 另外,如果用运算符重载,可以直接*series << get_image_data << segmentaiton_work,也可以**task << postprocess_work。

MaybeShewill-CV commented 2 years ago

@Barenboim 是的哈 用了太多timer和sleep来模拟,确实有点乱:) **series << xx << xx 学到了, 哈哈哈:)

Barenboim commented 2 years ago

@Barenboim 是的哈 用了太多timer和sleep来模拟,确实有点乱:) **series << xx << xx 学到了, 哈哈哈:)

不是**series哈,是**task。*task得到task的引用,task对象的'*'运算符,得到task所在series的引用,所以可以: **task << task1 << task2

MaybeShewill-CV commented 2 years ago

@Barenboim 嗯嗯 哈哈哈 键盘敲错了 ==!

Barenboim commented 2 years ago

@MaybeShewill-CV 你看一下这个新功能:https://github.com/sogou/workflow/pull/938

class WFTaskFactory
{
    /* Create 'Go' task with running time limit in seconds plus nanoseconds.
     * If time exceeded, state WFT_SYS_STATE_ERROR and error ETIMEDOUT will be got in callback. */
    template<class FUNC, class... ARGS>
    static WFGoTask *create_timedgo_task(time_t seconds, long nanoseconds,
                                         const std::string& queue_name,
                                         FUNC&& func, ARGS&&... args);
};
MaybeShewill-CV commented 2 years ago

@Barenboim 好滴,感觉这样很方便了,可以傻瓜式编程了。明天抽空试试看:)

MaybeShewill-CV commented 2 years ago

@Barenboim 抽空测试了一个demo,没有问题,用起来非常方便,赞。下来会再多测测,有问题再反馈哈:)

Barenboim commented 2 years ago

@Barenboim 抽空测试了一个demo,没有问题,用起来非常方便,赞。下来会再多测测,有问题再反馈哈:)

主要是从你的demo上看出来,计算时间限制好像用户非常有需求,所以就想到了实现这个功能。然后发现go task上加这个功能非常的自然,因为任务的类型是确定的。在实现的过程中,也帮我们优化了一些核心代码。果然用户是我们发展的原动力。

Barenboim commented 1 year ago

@MaybeShewill-CV 现在thread task也支持加超时了。

template<class INPUT, class OUTPUT>
class WFThreadTaskFactory
{
public:
    static T *create_thread_task(time_t seconds, long nanoseconds,
                                 const std::string& queue_name,
                                 std::function<void (INPUT *, OUTPUT *)> routine,
                                 std::function<void (T *)> callback);
};
MaybeShewill-CV commented 1 year ago

@Barenboim 点赞! 回去看看之前几个thread_task的任务能不能更新下:)