Closed ChristopheBraud closed 4 years ago
incrementWorkload is a must have in multi process, i use this request to achieve this goal
Db::get()->executeQuery(
"UPDATE plugin_process_manager_monitoring_item SET currentWorkload=currentWorkload+1 WHERE id=?",
[$this->monitoringItem->getId()],
[ParameterType::INTEGER]
);
Hi, thx for your suggestion.
We currently had a discussion about parallelisation and i agree, that it should have a better support fort it:-). Do you use any external bundle like https://github.com/webmozarts/console-parallelization or do you just start the Symfony child process? How do you pass the workload to the child process and how do you keep track of the number of child prozesses?
Hi,
No I don't use any external bundle, only Symfony Process component.
I pass the monitoring item with --monitoring-item-id
on the command line (as you can see on my first post) but in place of using $this->initProcessManager($input->getOption('monitoring-item-id')
to get my monitoring item (as in the main process) I just do a MonitoringItem::getById($input->getOption('monitoring-item-id'))
So i use the same monitoring item in all my sub processes
The total workload is fixed in the main process (or common ancestor) and children only do an increment.
At first, i use this code to increment workload :
$lockKey = sprintf("monitoring_%d", $this->monitoringItem->getId());
Lock::acquire($lockKey);
$this->monitoringItem = MonitoringItem::getById($this->monitoringItem->getId());
$current = $this->monitoringItem->getCurrentWorkload() + 1;
$this->monitoringItem->setCurrentWorkload($current);
$this->monitoringItem->save();
Lock::release($lockKey);
but it was fast too slow due to lock mechanism, so i use
Db::get()->executeQuery(
"UPDATE plugin_process_manager_monitoring_item SET currentWorkload=currentWorkload+1 WHERE id=?",
[$this->monitoringItem->getId()],
[ParameterType::INTEGER]
);
with some dawbacks such as no duration/update time update
To run processes i have to main methods : Sequential and Parallel :
In the first case i just do a foreach on my queue of processes and run tasks
foreach ($this->steps as $step) {
$this->changeStep($currentStep++, $totalSteps, sprintf("Launching '%s' step", $step['name']));
$this->runProcess($step['key'], $taskTimeOut);
}
in my runProcess
I have to fix some issues like sql timeout with :
Db::get()->executeQuery("SET session wait_timeout=?", [$taskTimeOut + 30], [ParameterType::INTEGER]);
Db::get()->executeQuery("SET session interactive_timeout=?", [$taskTimeOut + 30], [ParameterType::INTEGER]);
protected function runProcess($key, $timeout)
{
$env = getenv();
$env[ElementsProcessManagerBundle::MONITORING_ITEM_ENV_VAR] = '';
$process = new Process([
'php', 'bin/console',
'sub_process',
'--monitoring-item-id', $this->getMonitoring()->getId(),
'--sub-process-key', $key
], PIMCORE_PROJECT_ROOT, $env, null, $timeout);
if ($this->settings['log']) {
$process->start();
$iterator = $process->getIterator();
foreach ($iterator as $data) {
echo $data;
}
} else {
$process->run();
}
if (!$process->isSuccessful()) {
throw new ProcessFailedException($process);
}
}
Note: this is still a WIP but it's fully functional
For Parallel case I use an array of parameters as a guide to run sub processes
$this->progress(0, $total);
$this->runProcesses($key, $taskTimeOut, $paramsArray);
and the runProcesses function :
protected function runProcesses($subProcessKey, $timeout, $params)
{
$failedProcess = null;
$activeProcess = [];
$maxProcess = 4;
$env = getenv();
$env[ElementsProcessManagerBundle::MONITORING_ITEM_ENV_VAR] = '';
do {
while (count($activeProcess) < $maxProcess && count($params) > 0) {
$param = array_shift($params);
$process = new Process([
'php', 'bin/console',
'sub_process',
'--monitoring-item-id', $this->getMonitoring()->getId(),
'--sub-process-key', $subProcessKey,
'--params', json_encode($param),
], PIMCORE_PROJECT_ROOT, $env, null, $timeout);
$process->start();
$activeProcess[] = $process;
}
$activeProcess = array_filter($activeProcess, function (Process $process) use (&$failedProcess) {
fwrite(STDERR, $process->getIncrementalErrorOutput());
if (!$process->isRunning()) {
if (!$process->isSuccessful()) {
$failedProcess = $process;
}
return false;
}
return true;
});
sleep(1);
} while (count($activeProcess) > 0 || count($params) > 0);
if (!is_null($failedProcess)) {
throw new ProcessFailedException($failedProcess);
}
}
As you can see I use json_encode
to pass some parameters used by children. The json_decode
took place in the my command sub_process
.
All my sub process used the same object as the main process so i have the exact same context via my constructor, but my entry point is different between the main process and subprocess
Hi, thx for your input. In the latest version we added support for mutliprocessing. Basically it does the same as your code but with the processmanger internal functionality - so we could use the monitroing items to "communicate" through different processes.
The documentation is at https://github.com/elements-at/ProcessManager/blob/master/doc/usageParallelization.md and a sample command can be found at
I hope this works for you :-)
When using Symfony Process to run multiple process (For memory and speed issues) the monitoring item set state to finish for each child due to
onConsoleTerminate
handler and monitoring item passing by env vars. To resolve this issue, I reset the concerned env var :And i also add a thread safe "increment workload" function. It would be nice to have an option to better handle sub process by default