aces / Loris-MRI

The set of scripts that preprocess and insert MRI data into the database.
10 stars 49 forks source link

Dockerize the chunking process and integration with CBRAIN #618

Open cmadjar opened 3 years ago

cmadjar commented 3 years ago

@laemtl when you have time to add more details, maybe you could add the description for this task?

github-actions[bot] commented 2 years ago

This issue is stale because it has been open 1 year with no activity. Remove stale label or comment or this will be closed in 3 months.

laemtl commented 1 year ago

@cmadjar It's already done for IEEG Atlas.

laemtl commented 1 year ago

Still to do:

--

Add the following changes:

python/lib/eeg.py Replace physiological.create_chunks_for_visualization(eeg_file_id, self.data_dir) by physiological.create_chunks_for_visualization_on_cbrain(eeg_file_id, self.data_dir)

python/lib/physiological.py Add the function

def create_chunks_for_visualization_on_cbrain(self, physio_file_id, data_dir):
"""
Delegate chunking scripts to CBRAIN (useful if the file to chunk are big files)
if no chunk datasets yet available for
PhysiologicalFileID based on the file type of the original
electrophysiology dataset.

:param physio_file_id: PhysiologicalFileID of the dataset to chunk
 :type physio_file_id: int
:param data_dir     : LORIS data directory (/data/%PROJECT%/data)
 :type data_dir     : str
"""

# check if chunks already exists for this PhysiologicalFileID
results = self.grep_parameter_value_from_file_id(
  physio_file_id, 'electrophyiology_chunked_dataset_path'
)
chunk_path = results['Value'] if results else None

# determine which script to run based on the file type
if not chunk_path:
  file_type = self.grep_file_type_from_file_id(physio_file_id)
  file_path = self.grep_file_path_from_file_id(physio_file_id)

  # the bids_rel_dir is the first two directories in file_path 
  # (bids_imports/BIDS_dataset_name_BIDSVersion)
  bids_rel_dir   = file_path.split('/')[0] + '/' + file_path.split('/')[1]
  chunk_root_dir = data_dir + bids_rel_dir + '_chunks' + '/'

  # the final chunk path will be /data/%PROJECT%/data/bids_imports
  # /BIDS_dataset_name_BIDSVersion_chunks/EEG_FILENAME.chunks
  chunk_path = chunk_root_dir + os.path.splitext(os.path.basename(file_path))[0] + '.chunks'

  if not file_type == 'edf':
    # Other file type not supported (CBRAIN pipeline needs testing)
    # Launch the process locally
    create_chunks_for_visualization(physio_file_id, data_dir)
  else:
    # save data in db for later execution on CBRAIN
    self.db.insert(
      table_name   = 'physiological_chunk_process',
      column_names = ('PhysiologicalFileID', 'InputFilePath', 'Destination'),
      values       = (physio_file_id, data_dir + file_path, chunk_root_dir)
    )
laemtl commented 1 year ago

mysql patch:

CREATE TABLE `physiological_chunk_process` (
    `ID`                  INT(10) UNSIGNED NOT NULL AUTO_INCREMENT, 
    `CBRAINTaskID`        INT(10) UNSIGNED DEFAULT NULL,
    `CBRAINStatus`        VARCHAR(255)     DEFAULT NULL,
    `PhysiologicalFileID` INT(10) UNSIGNED NOT NULL,
    `InputFilePath`       TEXT             NOT NULL,
    `Destination`         TEXT             NOT NULL,
    `RegistrationDate`    DATETIME         DEFAULT CURRENT_TIMESTAMP,
    `LastUpdateDate`      DATETIME         DEFAULT NULL,
    PRIMARY KEY (`ID`),
    CONSTRAINT `FK_phys_file_ID_2`
        FOREIGN KEY (`PhysiologicalFileID`)
        REFERENCES `physiological_file` (`PhysiologicalFileID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
laemtl commented 1 year ago

Dockerfile:

FROM centos/python-36-centos7
USER root
RUN yum update -y && yum install -y git && yum install -y mysql-devel
RUN cd / && git clone https://github.com/aces/Loris-MRI.git
RUN pip install -r /Loris-MRI/python/requirements.txt

# The following UID and GID are chosen
# to match what is usually the unprivileged user
RUN groupadd -g 500        loris
RUN useradd  -u 500 -g 500 loris
USER loris

ENTRYPOINT ["python", "/Loris-MRI/python/react-series-data-viewer/edf_to_chunks.py"]
laemtl commented 1 year ago

check_task_status.php

<?php
require_once __DIR__ . "/../../../../vendor/autoload.php";

date_default_timezone_set('America/New_York');

# List of status for tasks that have completed successfully.
define("COMPLETED_STATUS", [
    "Completed"
]);

# List of status for tasks that have failed.
define("FAILED_STATUS", [
    "Failed To Setup",
    "Failed To PostProcess",
    "Failed On Cluster",
    "Failed Setup Prerequisites",
    "Failed PostProcess Prerequisites"
]);

# List of status for tasks that are in a final state.
define("FINAL_STATUS", 
    array_merge(COMPLETED_STATUS, FAILED_STATUS, ["Terminated"])
);

$admins = [];

$loris_client = new NDB_Client();
$loris_client->makeCommandLine();
$loris_client->initialize(__DIR__ . "/../../../config.xml");
$hook = \LORIS\CBRAIN_Hook::getInstance();

$DB = Database::singleton();

$processing_DP = 510;
$processing_DP_path = '/data1/processing/';

# grep all files pending on the db
$running_task_ids = $DB->pselectCol(
    "SELECT CBRAINTaskID
     FROM physiological_chunk_process
     WHERE CBRAINStatus NOT IN ('" . implode("', '", FINAL_STATUS) . "')
     AND CBRAINTaskID IS NOT NULL;",
    []
);

foreach ($running_task_ids as $running_task_id) {
    $task = $hook->getTask($running_task_id);

    $DB->update(
        'physiological_chunk_process',
        [
            'CBRAINStatus'   => $task['status'],
            'LastUpdateDate' => date('Y-m-d H:i:s'),
        ],
        ['CBRAINTaskID' => $running_task_id]
    );

    $file_data = $DB->pselectRow(
       "SELECT PhysiologicalFileID, Destination, InputFilePath
        FROM physiological_chunk_process
        WHERE CBRAINTaskID = :task_id;",
        ['task_id' => $running_task_id]
    );

    // If task completed successfully
    if (in_array($task['status'], COMPLETED_STATUS)) {
        $output_content = null;
        try {
            $output_id = $task['params']['_cbrain_output_output_dir'][0];
            $output_data = $hook->getUserfilesFromFileID($output_id);
            $output_content = glob($processing_DP_path . $output_data['name'] . "/*");
        } catch (Exception $e) {
            echo 'Caught exception: ',  $e->getMessage(), "\n";
        }

        if ($output_content && $output_content[0]) {
            $output_path = $output_content[0];

            # mv on the destination
            if (file_exists($output_path)) {
                $output = null;
                $retval = null;

                exec(
                    "mkdir -p " . $file_data['Destination'] . 
                    " ; chmod 777 " . $file_data['Destination'] . 
                    " ; mv " . $output_path  . " " . $file_data['Destination'] . "/",
                    $output,
                    $retval
                );

                if ($retval === 0) {
                    exec("chmod -R 777 " . $file_data['Destination'] . '/' . basename($output_path));

                    // Unregister from the DP
                    $hook->unregisterFilesFromDataProvider(
                        $processing_DP,
                        new \Swagger\Client\Model\MultiRegistrationModReq(
                            [
                                'basenames' => [$output_data['name']],
                                'filetypes' => ['FileCollection-'.$output_data['name']],
                                'delete'    => true
                            ]
                        )
                    );

                    $chunk_param_id = $DB->pselectOne(
                        "SELECT ParameterTypeID
                        FROM parameter_type 
                        WHERE Name = 'electrophyiology_chunked_dataset_path'",
                        []
                    );
                    $data_dir = NDB_Config::singleton()->getSetting('dataDirBasepath');

                    $DB->insert(
                        'physiological_parameter_file',
                         [
                             'PhysiologicalFileID' => $file_data['PhysiologicalFileID'],
                             'ParameterTypeID' => $chunk_param_id,
                             'Value' => str_replace($data_dir, "", $file_data['Destination']) . basename($output_path)
                         ]
                    );  
                } else {
                    // Send a notification
                    $send = mail(implode(", ", $admins), "Chunk task ID $running_task_id with duplicate content", "Chunk task ID $running_task_id completed but chunks already exist at destination.");
                    echo "Chunk task ID $running_task_id completed but chunks already exist at destination.\n";
                }
            }
        } else {
            // Send a notification
            $send = mail(implode(", ", $admins), "Chunk task ID $running_task_id with no output", "Chunk task ID $running_task_id completed but no chunks were found.");
            echo "Chunk task ID $running_task_id completed but no chunks were found.\n";
        }

        // Unregister edf file from the DP
        $file_name = basename($file_data['InputFilePath']);
        $hook->unregisterFilesFromDataProvider(
            $processing_DP,
            new \Swagger\Client\Model\MultiRegistrationModReq(
                [
                    'basenames' => [$file_name],
                    'filetypes' => ['SingleFile-'.$file_name],
                    'delete'    => true
                ]
            )
        );
    } else if (in_array($task['status'], FAILED_STATUS)) {
         // Send a notification
         $send = mail(implode(", ", $admins), "Chunk task ID $running_task_id failed", "Chunk task ID $running_task_id failed with status " . $task['status'] . ".");
         echo "Chunk task ID $running_task_id failed with status " . $task['status'] . ".\n";

         // Restart if trials < 2
         $trials = $DB->pselectOne(
             "SELECT COUNT(*)
              FROM physiological_chunk_process
              WHERE PhysiologicalFileID = :phys_file_id
              AND CBRAINStatus IN ('" . implode("', '", FAILED_STATUS) . "');",
              ['phys_file_id' => $file_data['PhysiologicalFileID']]
         );

         if ($trials < 2) {
             $DB->insert(
                 'physiological_chunk_process',
                 [
                     'PhysiologicalFileID' => $file_data['PhysiologicalFileID'],
                     'InputFilePath' => $file_data['InputFilePath'],
                     'Destination' => $file_data['Destination'],
                 ]
             );
         }
    }
}
laemtl commented 1 year ago

start_task.php

<?php
require_once __DIR__ . "/../../../../vendor/autoload.php";

date_default_timezone_set('America/New_York');

$loris_client = new NDB_Client();
$loris_client->makeCommandLine();
$loris_client->initialize(__DIR__ . "/../../../config.xml");
$hook = \LORIS\CBRAIN_Hook::getInstance();

$DB = Database::singleton();

$processing_DP = 510;
$processing_DP_path = '/data1/processing/';

# grep all files pending on the db
$edf_files_to_chunk = $DB->pselectCol(
    "SELECT DISTINCT InputFilePath FROM physiological_chunk_process
    WHERE CBRAINTaskID IS NULL",
    []
);

foreach ($edf_files_to_chunk as $edf_file_to_chunk) {
    $file_name = basename($edf_file_to_chunk);
    $file_copy_path = $processing_DP_path . $file_name;

    # cp on the dp
    if (!file_exists($file_copy_path)) {
        copy($edf_file_to_chunk, $file_copy_path);
    }

    $file_data = new \Swagger\Client\Model\MultiRegistrationModReq(
        [
            'basenames' => [$file_name],
            'filetypes' => ['SingleFile-'.$file_name],
            'delete'    => true
        ]
    );

    $resp = $hook->registerFilesFromDataProvider($processing_DP, $file_data);

    if ($resp['newly_registered_userfiles']) {
        $file_id = $resp['newly_registered_userfiles'][0]['id'];
    } else if ($resp['previously_registered_userfiles']) {
        $file_id = $resp['previously_registered_userfiles'][0]['id'];
    } else {
        return;
    }

    $task = $hook->launchTask(new \Swagger\Client\Model\CbrainTaskModReq(
        [
        'cbrain_task' => [
                'params' => [
                    'interface_userfile_ids' => [$file_id],
                    'files'                  => [$file_id],
                    'destination'            => 'output'
                ],
                'tool_config_id'           => 2331,
                'results_data_provider_id' => $processing_DP,
                'description'              => 'Loris EEG chunking',
            ]
        ]
    ));

    $task_id = $task[0]['id'];
    $task_status = $task[0]['status'];

    $DB->update(
        'physiological_chunk_process',
        [
            'CBRAINTaskID'   => $task_id,
            'CBRAINStatus'   => $task_status,
            'LastUpdateDate' => date('Y-m-d H:i:s'),
        ],
        ['InputFilePath' => $edf_file_to_chunk]
    );
}
laemtl commented 1 year ago

loris-eeg-chunking.json (CBrain descriptor file to create the task, in loris_root/cbrain_task_descriptors) Docker image hosted here: https://hub.docker.com/r/lorismri/chunking

{
    "name": "LorisEEGchunking",
    "description": "Creates chunks from one or more .edf files for the LORIS EEG browser visualization component.",
    "tool-version": "v0.1.0",
    "schema-version": "0.5",
    "command-line": "python /Loris-MRI/python/react-series-data-viewer/edf_to_chunks.py [files] [destination]",
    "container-image": {
        "image": "lorismri/chunking:v1",
        "type": "singularity",
        "index": "docker://"
    },
    "inputs": [
        {
            "id": "files",
            "name": "one or more .edf files to convert to a directory of chunks",
            "optional": false,
            "type": "File",
            "value-key": "[files]"
        },
        {
            "command-line-flag": "--destination",
            "id": "destination",
            "name": "optional destination for all the chunk directories",
            "optional": true,
            "type": "String",
            "value-key": "[destination]"
        }
    ],
    "output-files" : [{
        "id" : "output_dir",
        "name" : "Output Directory",
        "description" : "This is the directory where the overall outputs are to be stored.",
        "path-template" : "[destination]",
        "optional" : false
    }]
}
laemtl commented 1 year ago

composer.json:

  "repositories": [
      {
        "type": "vcs",
        "url": "git@github.com:xlecours/cbrain-php-client.git"
      }
    ],
     "require" : {
        "xlecours/cbrain-php-client": "dev-master"
     },
laemtl commented 1 year ago

libraries/CBRAIN_Hook.class.inc:

<?php
namespace LORIS;

use \Swagger\Client\Configuration;
use \Swagger\Client\Api\SessionsApi;
use \Swagger\Client\Api\DataProvidersApi;
use \Swagger\Client\Api\ToolConfigsApi;
use \Swagger\Client\Api\ToolsApi;
use \Swagger\Client\Api\TasksApi;
use \Swagger\Client\Api\UserfilesApi;
use \Swagger\Client\Model\CbrainTask;
use \Swagger\Client\Model\CbrainTaskModReq;
use \Swagger\Client\Model\MultiRegistrationModReq;

class CBRAIN_Hook
{
    private static $instance = null;

    private $config;

    public static function getInstance(): CBRAIN_Hook
    {
        if (self::$instance === null) {
            self::$instance = new CBRAIN_Hook();
        }
        return self::$instance;
    }

    private function __construct()
    {
        $this->config = new Configuration();

        $cbrain_config = (\NDB_Factory::singleton())->config()
            ->getSetting('CBRAIN');

        $this->config->setHost($cbrain_config['host']);

        $session_api = new SessionsApi(
            null, // Will use Guzzle/Http/Client by default
            $this->config
        );

        $this->_setApiToken($session_api, $cbrain_config);
    }

    /**
     * This will set the cbrain api token in the session using the cached token or
     * request a new one if needed. Session token sould be valid for 24 hours and
     * the 24 hours period is renewed at each succesfull usage.
     *
     * @param SessionsApi $session_api The session_api used by the client
     * @param array $cbrain_config The config values
     */
    private function _setApiToken(SessionsApi $session_api, array $cbrain_config): void
    {
        try {
            // grab token from temp file
            $temp_data = file_get_contents(__DIR__ . '/../tools/cbrain/cbrain_temp_data.json');
            $temp_data_array = json_decode($temp_data, true);
            $token = $temp_data_array['token'] ?? '';

            // Check if token is still valid
            $this->config->setApiKey(
                'cbrain_api_token',
                $token
            );
            $session_api->sessionGet();
        } catch (\Swagger\Client\ApiException $e) {
            $this->_requestNewToken($session_api, $cbrain_config);
        }
    }

    private function _requestNewToken(SessionsApi $session_api, array $cbrain_config) : void
    {
        // request new token from cbrain
        $rep_session_post = $session_api->sessionPost(
            $cbrain_config['username'],
            $cbrain_config['password']
        );
        $token = $rep_session_post->getCbrainApiToken();
        $this->config->setApiKey('cbrain_api_token', $token);

        print_r("Requested New Token: $token \n");

        // write token to temp file
        $temp_data_file = __DIR__ . '/../tools/cbrain/cbrain_temp_data.json';
        $temp_data_array = json_decode(file_get_contents($temp_data_file), true);
        $temp_data_array['token'] = $token;
        file_put_contents($temp_data_file, json_encode($temp_data_array));
    }

    /**
     * @return \Swagger\Client\Model\FileInfo[]
     */
    public function registerFilesFromDataProvider(int $dataproviderid, MultiRegistrationModReq $filedata): object
    {
        return (new DataProvidersApi(null, $this->config))
            ->dataProvidersIdRegisterPost($dataproviderid, $filedata);
    }

    /**
     * @return \Swagger\Client\Model\FileInfo[]
     */
    public function unregisterFilesFromDataProvider(int $dataproviderid, MultiRegistrationModReq $filedata): object
    {
        return (new DataProvidersApi(null, $this->config))
            ->dataProvidersIdUnregisterPost($dataproviderid, $filedata);
    }

    /**
     * @return \Swagger\Client\Model\FileInfo[]
     */
    public function getUserfilesFromDataProvider(int $dataproviderid): array
    {
        return (new DataProvidersApi(null, $this->config))
            ->dataProvidersIdBrowseGet($dataproviderid);
    }

    /**
     * @return \Swagger\Client\Model\FileInfo[]
     */
    public function getUserfilesFromFileID(int $fileid): object
    {
        return (new UserfilesApi(null, $this->config))
            ->userfilesIdGet($fileid);
    }

    /**
     * Return an array of CBRAIN's dataproviders that fits the InputDataProvider
     * value defined in the configs.
     *
     * @return \Swagger\Client\Model\DataProvider[]
     */
    public function getInputDataProvider(): array
    {
        $dataprovidername = $cbrain_config = (\NDB_Factory::singleton())->config()
            ->getSetting('CBRAIN')['InputDataProviderName'];

        return array_filter(
            (new DataProvidersApi(null, $this->config))->dataProvidersGet(),
            function ($dp) use ($dataprovidername) {
                return $dp['name'] == $dataprovidername;
            }
        );
    }

    /**
     * Return an array of CBRAIN's dataproviders that fits the InputDataProvider
     * value defined in the configs.
     *
     * @return \Swagger\Client\Model\DataProvider[]
     */
    public function getOutputDataProvider(): array
    {
        $dataprovidername = $cbrain_config = (\NDB_Factory::singleton())->config()
            ->getSetting('CBRAIN')['OutputDataProviderName'];

        return array_filter(
            (new DataProvidersApi(null, $this->config))->dataProvidersGet(),
            function ($dp) use ($dataprovidername) {
                return $dp['name'] == $dataprovidername;
            }
        );
    }

    public function getAvailableTools(): array
    {
        return (new ToolConfigsApi(null, $this->config))->toolConfigsGet();
    }

    public function launchTask(CbrainTaskModReq $task): array
    {
        return (new TasksApi(null, $this->config))->tasksPost($task);
    }

    public function getTask(int $task_id): object
    {
        return (new TasksApi(null, $this->config))->tasksIdGet($task_id);
    }
}
laemtl commented 1 year ago

Script to import the data into LORIS

tools/loris-mri/pipeline.sh:

#!/usr/bin/env bash

# To execute periodically, add
# */15 * * * * cd /var/www/loris/project/tools/loris-mri && ./pipeline.sh
# in crontab -e

DATA_FOLDER='/data1'

source ${DATA_FOLDER}/loris-mri/bin/mri/environment

# use nullglob in case there are no matching files
shopt -s nullglob

# check if new archives were uploaded in DATA_FOLDER/incoming
archives=(${DATA_FOLDER}/incoming/*.tar.gz)

for ((i=0; i<${#archives[@]}; i++)); do
    folder=$(date +"%F-%T")

    # untar the archive in processing
    mkdir -p ${DATA_FOLDER}/processing/bidsFiles/${folder}/
    tar xvzf "${archives[$i]}" -C ${DATA_FOLDER}/processing/bidsFiles/${folder}/

    # move the archive in archives
    mv "${archives[$i]}" ${DATA_FOLDER}/archives/

    # locate dataset_description.json and extract path
    path=$(dirname $(find ${DATA_FOLDER}/processing/bidsFiles/${folder} -type f -name "dataset_description.json"))

    # import
    python3 /data1/loris-mri/bin/mri/python/bids_import.py -d "$path" -p database_config.py -csv

    if [ $? -eq 0 ]
    then
      # if import successful delete the folder in processing
      echo "Successfully imported ${DATA_FOLDER}/processing/bidsFiles/${folder}"
      rm -R ${DATA_FOLDER}/processing/bidsFiles/${folder}
    else
      echo "Could not import ${DATA_FOLDER}/processing/bidsFiles/${folder}" >&2
    fi
done
laemtl commented 1 year ago

Script to start the chunking task on CBRAIN:

<?php
require_once __DIR__ . "/../../../../vendor/autoload.php";

date_default_timezone_set('America/New_York');

$loris_client = new NDB_Client();
$loris_client->makeCommandLine();
$loris_client->initialize(__DIR__ . "/../../../config.xml");
$hook = \LORIS\CBRAIN_Hook::getInstance();

$DB = Database::singleton();

$processing_DP = 510;
$processing_DP_path = '/data1/processing/';

# grep all files pending on the db
$edf_files_to_chunk = $DB->pselectCol(
    "SELECT DISTINCT InputFilePath FROM physiological_chunk_process
    WHERE CBRAINTaskID IS NULL",
    []
);

foreach ($edf_files_to_chunk as $edf_file_to_chunk) {
    $file_name = basename($edf_file_to_chunk);
    $file_copy_path = $processing_DP_path . $file_name;

    # cp on the dp
    if (!file_exists($file_copy_path)) {
        copy($edf_file_to_chunk, $file_copy_path);
    }

    $file_data = new \Swagger\Client\Model\MultiRegistrationModReq(
        [
            'basenames' => [$file_name],
            'filetypes' => ['SingleFile-'.$file_name],
            'delete'    => true
        ]
    );

    $resp = $hook->registerFilesFromDataProvider($processing_DP, $file_data);

    if ($resp['newly_registered_userfiles']) {
        $file_id = $resp['newly_registered_userfiles'][0]['id'];
    } else if ($resp['previously_registered_userfiles']) {
        $file_id = $resp['previously_registered_userfiles'][0]['id'];
    } else {
        return;
    }

    $task = $hook->launchTask(new \Swagger\Client\Model\CbrainTaskModReq(
        [
        'cbrain_task' => [
                'params' => [
                    'interface_userfile_ids' => [$file_id],
                    'files'                  => [$file_id],
                    'destination'            => 'output'
                ],
                'tool_config_id'           => 2331,
                'results_data_provider_id' => $processing_DP,
                'description'              => 'Loris EEG chunking',
            ]
        ]
    ));

    $task_id = $task[0]['id'];
    $task_status = $task[0]['status'];

    $DB->update(
        'physiological_chunk_process',
        [
            'CBRAINTaskID'   => $task_id,
            'CBRAINStatus'   => $task_status,
            'LastUpdateDate' => date('Y-m-d H:i:s'),
        ],
        ['InputFilePath' => $edf_file_to_chunk]
    );
}

Prerequisites:

Ask CBRAIN to create a data provider on the prod VM pointing to data/processing linked to a lorisadmin user Add the following in config.xml

<lorisDB>
  <CBRAIN>
      <host>https://portal.cbrain.mcgill.ca</host>
      <username>LORISADMINUSER</username>
      <password>LORISADMINUSERPWD</password>
      <InputDataProviderName>PROCESSINGDP (OpenIEEGAtlas-Dev-Processing)</InputDataProviderName>
      <OutputDataProviderName>PROCESSINGDP (OpenIEEGAtlas-Dev-Processing)</OutputDataProviderName>
  </CBRAIN>

Make sure the CBRAIN user has access to the Chunking tool (ID:360, LorisEEGchunking) Update the Data Provider ID and CBRAIN credentials in the code.

laemtl commented 1 year ago

Script to check the completion state of each running chunking tasks on CBRAIN:

<?php
require_once __DIR__ . "/../../../../vendor/autoload.php";

date_default_timezone_set('America/New_York');

# List of status for tasks that have completed successfully.
define("COMPLETED_STATUS", [
    "Completed"
]);

# List of status for tasks that have failed.
define("FAILED_STATUS", [
    "Failed To Setup",
    "Failed To PostProcess",
    "Failed On Cluster",
    "Failed Setup Prerequisites",
    "Failed PostProcess Prerequisites"
]);

# List of status for tasks that are in a final state.
define("FINAL_STATUS", 
    array_merge(COMPLETED_STATUS, FAILED_STATUS, ["Terminated"])
);

$admins = [];

$loris_client = new NDB_Client();
$loris_client->makeCommandLine();
$loris_client->initialize(__DIR__ . "/../../../config.xml");
$hook = \LORIS\CBRAIN_Hook::getInstance();

$DB = Database::singleton();

$processing_DP = 510;
$processing_DP_path = '/data1/processing/';

# grep all files pending on the db
$running_task_ids = $DB->pselectCol(
    "SELECT CBRAINTaskID
     FROM physiological_chunk_process
     WHERE CBRAINStatus NOT IN ('" . implode("', '", FINAL_STATUS) . "')
     AND CBRAINTaskID IS NOT NULL;",
    []
);

foreach ($running_task_ids as $running_task_id) {
    $task = $hook->getTask($running_task_id);

    $DB->update(
        'physiological_chunk_process',
        [
            'CBRAINStatus'   => $task['status'],
            'LastUpdateDate' => date('Y-m-d H:i:s'),
        ],
        ['CBRAINTaskID' => $running_task_id]
    );

    $file_data = $DB->pselectRow(
       "SELECT PhysiologicalFileID, Destination, InputFilePath
        FROM physiological_chunk_process
        WHERE CBRAINTaskID = :task_id;",
        ['task_id' => $running_task_id]
    );

    // If task completed successfully
    if (in_array($task['status'], COMPLETED_STATUS)) {
        $output_content = null;
        try {
            $output_id = $task['params']['_cbrain_output_output_dir'][0];
            $output_data = $hook->getUserfilesFromFileID($output_id);
            $output_content = glob($processing_DP_path . $output_data['name'] . "/*");
        } catch (Exception $e) {
            echo 'Caught exception: ',  $e->getMessage(), "\n";
        }

        if ($output_content && $output_content[0]) {
            $output_path = $output_content[0];

            # mv on the destination
            if (file_exists($output_path)) {
                $output = null;
                $retval = null;

                exec(
                    "mkdir -p " . $file_data['Destination'] . 
                    " ; chmod 777 " . $file_data['Destination'] . 
                    " ; mv " . $output_path  . " " . $file_data['Destination'] . "/",
                    $output,
                    $retval
                );

                if ($retval === 0) {
                    exec("chmod -R 777 " . $file_data['Destination'] . '/' . basename($output_path));

                    // Unregister from the DP
                    $hook->unregisterFilesFromDataProvider(
                        $processing_DP,
                        new \Swagger\Client\Model\MultiRegistrationModReq(
                            [
                                'basenames' => [$output_data['name']],
                                'filetypes' => ['FileCollection-'.$output_data['name']],
                                'delete'    => true
                            ]
                        )
                    );

                    $chunk_param_id = $DB->pselectOne(
                        "SELECT ParameterTypeID
                        FROM parameter_type 
                        WHERE Name = 'electrophyiology_chunked_dataset_path'",
                        []
                    );
                    $data_dir = NDB_Config::singleton()->getSetting('dataDirBasepath');

                    $DB->insert(
                        'physiological_parameter_file',
                         [
                             'PhysiologicalFileID' => $file_data['PhysiologicalFileID'],
                             'ParameterTypeID' => $chunk_param_id,
                             'Value' => str_replace($data_dir, "", $file_data['Destination']) . basename($output_path)
                         ]
                    );  
                } else {
                    // Send a notification
                    $send = mail(implode(", ", $admins), "Chunk task ID $running_task_id with duplicate content", "Chunk task ID $running_task_id completed but chunks already exist at destination.");
                    echo "Chunk task ID $running_task_id completed but chunks already exist at destination.\n";
                }
            }
        } else {
            // Send a notification
            $send = mail(implode(", ", $admins), "Chunk task ID $running_task_id with no output", "Chunk task ID $running_task_id completed but no chunks were found.");
            echo "Chunk task ID $running_task_id completed but no chunks were found.\n";
        }

        // Unregister edf file from the DP
        $file_name = basename($file_data['InputFilePath']);
        $hook->unregisterFilesFromDataProvider(
            $processing_DP,
            new \Swagger\Client\Model\MultiRegistrationModReq(
                [
                    'basenames' => [$file_name],
                    'filetypes' => ['SingleFile-'.$file_name],
                    'delete'    => true
                ]
            )
        );
    } else if (in_array($task['status'], FAILED_STATUS)) {
         // Send a notification
         $send = mail(implode(", ", $admins), "Chunk task ID $running_task_id failed", "Chunk task ID $running_task_id failed with status " . $task['status'] . ".");
         echo "Chunk task ID $running_task_id failed with status " . $task['status'] . ".\n";

         // Restart if trials < 2
         $trials = $DB->pselectOne(
             "SELECT COUNT(*)
              FROM physiological_chunk_process
              WHERE PhysiologicalFileID = :phys_file_id
              AND CBRAINStatus IN ('" . implode("', '", FAILED_STATUS) . "');",
              ['phys_file_id' => $file_data['PhysiologicalFileID']]
         );

         if ($trials < 2) {
             $DB->insert(
                 'physiological_chunk_process',
                 [
                     'PhysiologicalFileID' => $file_data['PhysiologicalFileID'],
                     'InputFilePath' => $file_data['InputFilePath'],
                     'Destination' => $file_data['Destination'],
                 ]
             );
         }
    }
}
github-actions[bot] commented 7 months ago

This issue is stale because it has been open 1 year with no activity. Remove stale label or comment or this will be closed in 3 months.