taskflow / taskflow

A General-purpose Task-parallel Programming System using Modern C++
https://taskflow.github.io
Other
10.01k stars 1.18k forks source link

PipeType::PARALLEL Parallel data access security #525

Open ayongsir opened 10 months ago

ayongsir commented 10 months ago

I have an application that uses tf:: Pipeline to build a multi DAG task, and uses pf. token() to control the processing data of each task.

If the type of the tf:: Pipe task is set to PipeType:: SERIAL, then I will construct an array based on pf. pipe() to save data, which can safely access the concurrent data of the Pipeline.

However, if I set the type of the tf:: Pipe task to PipeType:: PARALLEL, how can I know the concurrent quantity of this tf:: Pipe task under the current data flow? Or how can I safely access data concurrently?

I think tf:: Taskflow task support parameter settings can solve this problem and be more flexible.

Is there a plan to support parameters for the tf:: Taskflow task?

Looking forward to a reply and thanking in advance!

cheng-hsiang-chiu commented 10 months ago

Hi ayongsir, could you provide a sample code so we can better understand the question? Thank you.

ayongsir commented 10 months ago

Hi ayongsir, could you provide a sample code so we can better understand the question? Thank you.

ok,Thank you. Here is my understanding. I don't know if there are any mistakes, please advise me.

The core logic of my code is as follows:

1) I use a queue to cache data, with a maximum of 4 parallel pipelines based on the size of the cached data; 2) Each pipeline has 4 tasks to process data, which are composed of DAG graphs; 3) Considering the parallel data security of multiple pipelines, I use the same number of arrays as the pipeline; 4) Each task accesses data based on the saved token index;

Regarding the third article, if tf:: PipeType is set to SERIAL, I understand it is safe because only the parallel of the pipeline, and the tasks within the same pipeline are serial

But if tf:: PipeType is set to PARALLEL, in addition to the parallelism of the pipeline, there are also parallelization of individual tasks within the pipeline. Is there a way to access data in parallel without using locks?

I thought about it for a moment. If the task supports parameters, such a problem would be relatively simple, such as the following: auto task1 = pipetask[0].emplace( [this](pipe_index, token_index) { token = token[pipe_index][tokenindex]; method[0]->DoProcess(line_datas[token]); });

  tf::Pipe{tf::PipeType::PARALLEL,
           [&](tf::Pipeflow &pf) {
             task_executor_->corun(pipe_task_[pf.pipe()](pf.pipe(), pf.token()));
           }},

The code I am currently using is as follows:

class TaskFlow { private: static const uint32_t kPipeTaskNum = 4; static const uint32_t kPipeLineNum = 4; std::atomic runflag = false; std::atomic pipeflag = false; std::unique_ptr threadptr = nullptr; size_t tokennum = 0; std::array<size_t, kPipeTaskNum> tokenindex; std::array<std::shared_ptr, kPipeLineNum> line_datas; BoundedQueue<std::shared_ptr> cachedata; std::array<tf::Taskflow, kPipeTaskNum> pipetask; std::unique_ptr taskflow = nullptr; std::unique_ptr taskexecutor = nullptr; };

inline void TaskFlow::AddData(const std::shared_ptr &frame) { if (!cachedata.Enqueue(frame)) { LOG_WARN << "Worker data enqueue failed."; } }

bool_t TaskFlow::Init() { taskexecutor.reset(new tf::Executor());

auto task1 = pipetask[0].emplace( [this]() { method_[0]->DoProcess(line_datas[tokenindex[0]]); }); auto task2 = pipetask[1].emplace( [this]() { method_[1]->DoProcess(line_datas[tokenindex[1]]); }); auto task3 = pipetask[2].emplace( [this]() { method_[2]->DoProcess(line_datas[tokenindex[2]]); }); auto task4 = pipetask[3].emplace( [this]() { method_[3]->DoProcess(line_datas[tokenindex[3]]); });

static tf::Pipeline pipeline( kPipeLineNum, // first pipe runs taskflow1 tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow &pf) { if (pf.token() == tokennum) { pf.stop(); return; } tokenindex[pf.pipe()] = pf.token(); taskexecutor->corun(pipetask[pf.pipe()]); }},

  tf::Pipe{tf::PipeType::SERIAL,
           [&](tf::Pipeflow &pf) {
             token_index_[pf.pipe()] = pf.token();
             task_executor_->corun(pipe_task_[pf.pipe()]);
           }},

  tf::Pipe{tf::PipeType::SERIAL,
           [&](tf::Pipeflow &pf) {
             token_index_[pf.pipe()] = pf.token();
             task_executor_->corun(pipe_task_[pf.pipe()]);
           }},

  tf::Pipe{tf::PipeType::SERIAL,
           [&](tf::Pipeflow &pf) {
             token_index_[pf.pipe()] = pf.token();
             task_executor_->corun(pipe_task_[pf.pipe()]);
           }},

taskflow->composed_of(pipeline);

corecallback = [&]() { auto callback_func = [&]() { tokennum = 0; pipeline.reset(); pipeflag.store(true); };

task_executor_->run(*(task_flow_.get()), callback_func);

};

return true; }

void TaskFlow::Start() { pipeflag.store(true); runflag.store(true); if (threadptr == nullptr) { threadptr.reset(new std::thread(&TaskFlow::Core, this)); } }

void TaskFlow::Core() { while (runflag.load()) { if (pipeflag.load() && cachedata.WaitDequeue(&line_datas[0])) { pipeflag.store(false); if (!cachedata.Empty()) { tokennum = cachedata.Size() > (kPipeLineNum - 1) ? (kPipeLineNum - 1) : cachedata.Size();

    for (size_t i = 1; i <= token_num_; i++) {
      cache_data_.Dequeue(&line_datas[i]);
    }
  }

  token_num_ += 1;

  core_callback_();
}

} }