To run:
pip install .
run-detection
To install when developing:
pip install .[dev]
To demo and test. The easiest way to test the whole run detection currently:
sudo docker compose up -d
Run detection has 5 environment variables it will check
QUEUE_HOST
- host name of the queue serverQUEUE_USER
- Username of the application user run detection should use when connecting to queueQUEUE_PASSWORD
- Password of the above userINGRESS_QUEUE_NAME
- queue name that run detection will consume fromEGRESS_QUEUE_NAME
- queue name that run detection will produce toIf these are not provided, run detection will choose default station names, "watched-files", "scheduled-jobs". localhost will be used as the default host, and the default credentials, guest guest, will be used.
docker build . -f ./container/Dockerfile -t ghcr.io/fiaisis/rundetection
docker run -it --rm --mount source=/archive,target=/archive --name rundetection ghcr.io/fiaisis/rundetection
docker push ghcr.io/fiaisis/rundetection -a
docker pull ghcr.io/fiaisis/rundetection:latest
To run the unit tests only run: pytest . --ignore test/test_e2e.py
To run the e2e tests:
cd test
docker-compose up -d
cd ..
pytest test/test_e2e.py
This will pull the kafka/activemq containers and build the run detection container. Any code changes made after starting run detection will require the run detection container to be rebuilt.
In certain cases, specific instruments may include additional metadata that can be used as run inputs. As these metadata are instrument-specific, they will not apply to all Nexus files. To accommodate these additional metadata, you can create custom extraction functions for the ingestion process (e.g., mari_extract).
Adding Custom Nexus Extraction Rules
In certain cases, specific instruments may include additional metadata that can be used as run inputs. As these metadata are instrument-specific, they will not apply to all Nexus files. To accommodate these additional metadata, you can create custom extraction functions for the ingestion process (e.g., mari_extract).
To add a custom extraction function, follow these steps:
1.
def my_instrument_extract(job_request: JobRequest, dataset: Any) -> JobRequest
"""
Extracts additional metadata specific to my instrument from the given dataset and updates the JobRequest
instance. If the metadata does not exist, the default values will be set instead.
:param job_request: JobRequest instance for which to extract additional metadata
:param dataset: The dataset from which to extract additional MARI-specific metadata.
:return: JobRequest instance with updated additional metadata
"""
job_request.additional_values["some_key"] = dataset.get("some_key")
return job_request
Where the extraction function has the type Callable[[JobRequest, Any] JobRequest]
While Any is listed, it is actually a h5py group, but the library does not have any type stubs.
Next update the extraction factory function:
2.
def get_extraction_function(instrument: str) -> Callable[[JobRequest, Any], JobRequest]:
"""
Given an instrument name, return the additional metadata extraction function for the instrument
:param instrument: str - instrument name
:return: Callable[[JobRequest, Any], JobRequest]: The additional metadata extraction function for the instrument
"""
match instrument.lower():
case "mari":
return mari_extract
case "my_instrument":
return my_instrument_extract
case _:
return skip_extract
After making these two changes, when a run for your instrument is detected, the new extraction function will be automatically called, and the JobRequest.additional_values will be updated accordingly.
For a run to be sent downstream the metadata of the recieved file must meet the specification for that instrument.
The specifications for each instrument are found in rundetection/specifications/<instrument>_specification.json
An example specification file:
{
"enabled": true
}
Within the json file each field is considered to be a Rule
and has a class associated with it. e.g. the EnabledRule
class.
Below is an example of adding a new rule. The example is unrealistic, but it shows how much flexibility there is.
{
"enabled": true,
"skipTitlesIncluding": ["25581", "bar", "baz"]
}
Create the Rule
implementation:
class SkipTitlesIncludingRule(Rule[List[str]]):
def verify(self, job_request: JobRequest) -> None:
job_request.will_reduce = any(word in run.experiment_title for word in self._value)
Update the RuleFactory
:
def rule_factory(key: str, value: T_co) -> Rule[T_co]:
"""
Given the rule key, and rule value, return the rule implementation
:param key: The key of the rule
:param value: The value of the rule
:return: The Rule implementation
"""
match key.lower():
case "enabled":
if isinstance(value, bool):
return EnabledRule(value)
else:
raise ValueError(f"Bad value: {value} in rule: {key}")
case "skiptitlesincluding":
if isinstance(value, list):
return SkipTitlesIncludingRule(value)
else:
raise ValueError(f"Bad value: {value} in rule: {key}")
case _:
raise MissingRuleError(f"Implementation of Rule: {key} does not exist.")