TimeEval / TimeEval-algorithms

Time series anomaly detection algorithm implementations for TimeEval (Docker-based)
https://timeeval.readthedocs.io
134 stars 31 forks source link

Integrate new algorithm into TimeEval and evaluate it on the provided datasets #4

Closed B-Seif closed 2 years ago

B-Seif commented 2 years ago

Thank you for sharing your code which is of high quality. Do you have a link to download all the datasets used in your experiments?

SebastianSchmidl commented 2 years ago

Dear @B-Seif,

thank you for your interest in our research project. You can find the download links to all our datasets on our supporting website: https://hpi-information-systems.github.io/timeeval-evaluation-paper/notebooks/Datasets.html#sources

B-Seif commented 2 years ago

Thanks for your quick feedback. I have another question: I have a TS outlier detection algorithm and I would like to use your framework to launch experiments and compare it with other algorithms, what is the procedure to follow? create a custom image and use TimeEval ? do you have an example ?
For example : can I do this scenario: run 20 algorithms 10 times on 50 datasets and get the average and std of the performance of each algorithm on each dataset?

SebastianSchmidl commented 2 years ago

Yes, this is exaclty what we designed TimeEval for :smile:.

If your algorithm is written in Python, you could use our FunctionAdapter (example of using the FunctionAdapter). However, this comes with some limitations (such as no way to limit resource usage or setting timeouts). We, therefore, highly recommend to use the DockerAdapter. This means that you have to create a Docker image for your algorithm before you could use it in TimeEval, correct.

In the following, I assume that you want to create a Docker image with your algorithm to execute it with TimeEval. We provide base images for various programming languages. You can find them here. If your language or runtime is not in the list, we can work together to create one.

Procedure

  1. Build base image

    1. Clone this repository
    2. Build the selected base image from 0-base-images. Please make sure that you tag your image correctly (the image name must match the FROM-clause in your algorithm image.
  2. Integrate your algorithm into TimeEval and build the Docker image (you can use any algorithm in this repository as an example for that)

    • TimeEval uses a common interface to execute all its algorithms (using the DockerAdapter). This interface describes data input and output as well as algorithm configuration. The calling-interface is described in this repositories' README. Please read the section carefully and adapt your algorithm to the interface description. You could also create a wrapper script that takes care of the integration. Our canonical file format for time series datasets is described here.
    • Create a Dockerfile for your algorithm that is based on your selected base image (example).
    • Build your algorithm Docker image.
    • Check if your algorithm can read a time series using our common file format.
    • Check if the algorithm parameters are correctly set using TimeEval's call format.
    • Check if the anomaly scores are written in the correct format (an anomaly score value for each point of the original time series in a headerless CSV-file).
    • The README contains example calls to test your algorithm after you have build the Docker image for it.
  3. Install TimeEval (pip install timeeval==1.2.4)

  4. Create an experiment script with your configuration of datasets, algorithms, etc. See next section for an example.

Example experiment script

#!/usr/bin/env python3

import timeeval
assert timeeval.__version__ == "1.2.4", "TimeEval version not supported. This script is for TimeEval version 1.2.4!"

from pathlib import Path

from timeeval import TimeEval, MultiDatasetManager, Metric, Algorithm, TrainingType, InputDimensionality, ResourceConstraints
from timeeval.adapters import DockerAdapter
from timeeval.params import FixedParameters
from timeeval.resource_constraints import GB

def main():
    # load datasets and select them
    dm = MultiDatasetManager([Path("tests/example_data")])  # or the path to your datasets (requires a datasets.csv-file in the folder)
    datasets = dm.select()  # selects ALL available datasets
    # datasets = dm.select(min_anomalies=2)  # select all datasets with at least 2 anomalies
    # we just want 50 datasets:
    datasets = datasets[:50]

    # add and configure your algorithms
    algorithms = [Algorithm(
        name="<YOUR ALGORITHM>",
        # set skip_pull=True because the image is already present locally:
        main=DockerAdapter(image_name="<YOUR ALGORITHM IMAGE NAME>", tag="latest", skip_pull=True),
        # the hyper parameters of your algorithm:
        param_config=FixedParameters({
            "window_size": 20,
            "random_state": 42
        }),
        # required by DockerAdapter
        data_as_file=True,
        # UNSUPERVISED --> no training, SEMI_SUPERVISED --> training on normal data, SUPERVISED --> training on anomalies
        # if SEMI_SUPERVISED or SUPERVISED, the datasets must have a corresponding training time series
        training_type=TrainingType.UNSUPERVISED,
        input_dimensionality=InputDimensionality.MULTIVARIATE
    )]

    # set the number of repetitions of each algorithm-dataset combination:
    repetitions = 10
    # set resource constraints
    rcs = ResourceConstraints.default_constraints()
    # if you want to limit the CPU or memory per algorithm, you can use:
    # rcs = ResourceConstraints(
    #     task_memory_limit = 2 * GB,
    #     task_cpu_limit = 1.0,
    # )
    timeeval = TimeEval(dm, datasets, algorithms,
        repetitions=repetitions,
        metrics=[Metric.ROC_AUC, Metric.RANGE_PR_AUC],
        resource_constraints=rcs
    )

    timeeval.run()
    # aggregated=True gives the mean and stddev of each algorithm-dataset combination
    results = timeeval.get_results(aggregated=True)
    print(results)

    # detailled results are automatically stored in your current working directory at ./results/<datestring>

if __name__ == "__main__":
    main()

I guess, we have to create a Integrate your own algorithm into TimeEval-guide at some point. Please let me know if the previous instructions were helpful to you. Help in documenting the steps necessary to integrate an algorithm is highly appreciated!

B-Seif commented 2 years ago

Thanks for the detailed answer. I think it would be great to add such a section, it will make the integration much easier. I downloaded two datasets ( Calit2 and Genesis ) and put them in the tests/example folder. Then, I ran the above script and got this error : FileNotFoundError: Could not find the index files (tests/example_data/datasets.csv). Is your data_folders parameter correct? How can I generate this index datasets.csv file ? here is the tree structure I have :

eval.py tests/example_data tests/example_data/Calit2/metadata.json tests/example_data/Calit2/test.csv tests/example_data/Genesis/metadata.json tests/example_data/Genesis/test.csv

SebastianSchmidl commented 2 years ago

You can download the index file for all our dataset collections from the Datasets website (directly below the table with the dataset download links): datasets.csv. Place this file at tests/example_data/datasets.csv. Afterward, you have two options: Either use the full index file and filter the datasets in the Python script (see below) or you remove all the other datasets from the index and adapt the paths to your files.

Option 1: You can use the Dataset-API to select your algorithms:

dm = MultiDatasetManager([Path("tests/example_data")])
datasets = []
datasets.append(dm.select(collection="CalIt2"))
datasets.append(dm.select(collection="Genesis"))

In this case the dataset-folder must contain the following files:

tests/example_data
tests/example_data/datasets.csv
tests/example_data/multivariate
tests/example_data/multivariate/Genesis
tests/example_data/multivariate/Genesis/genesis-anomalies.test.csv
tests/example_data/multivariate/CalIt2
tests/example_data/multivariate/CalIt2/CalIt2-traffic.test.csv

The metadata-files are optional.


Option 2: Download the index file and adapt it to your folder structure:

The datasets.csv should look like this:

collection_name,dataset_name,train_path,test_path,dataset_type,datetime_index,split_at,train_type,train_is_normal,input_type,length,dimensions,contamination,num_anomalies,min_anomaly_length,median_anomaly_length,max_anomaly_length,mean,stddev,trend,stationarity,period_size
CalIt2,CalIt2-traffic,,Calit2/test.csv,real,True,,unsupervised,False,multivariate,5040,2,0.0408730158730158,29,2,7,19,3.8128968253968254,6.422468293621787,no trend,difference_stationary,48.0
Genesis,genesis-anomalies,,Genesis/test.csv,real,True,,unsupervised,False,multivariate,16220,18,0.00308261405672,3,2,22,26,11525.07423619674,9261.50200315977,no trend,difference_stationary,
B-Seif commented 2 years ago

Thanks it worked for the dataset. Now I have a problem with docker image. To understand, I first tried to build the LOF image and evaluate it on CalIt2 dataset. So here is the list of images I have :
REPOSITORY TAG IMAGE ID CREATED SIZE registry.gitlab.hpi.de/akita/i/lof latest a88cd22a4b1e 25 hours ago 546MB registry.gitlab.hpi.de/akita/i/pyod latest dd6954ca5e08 25 hours ago 541MB registry.gitlab.hpi.de/akita/i/python3-base latest ca1e06fefcfb 26 hours ago 409MB python 3.7.9-slim-buster 95df6bf70e35 17 months ago 113MB

and my python code :

    dm = MultiDatasetManager([Path("tests/example_data")])
    datasets = dm.select() 
    print("************",len(datasets))

    # add and configure your algorithms
    algorithms = [Algorithm(
        name="lof",
        # set skip_pull=True because the image is already present locally:
        main=DockerAdapter(image_name="registry.gitlab.hpi.de/akita/i/lof:latest", skip_pull=True),
        # the hyper parameters of your algorithm:
        param_config=FixedParameters({
            #"window_size": 20,
            "random_state": 42
        }),
        # required by DockerAdapter
        data_as_file=True,
        # UNSUPERVISED --> no training, SEMI_SUPERVISED --> training on normal data, SUPERVISED --> training on anomalies
        # if SEMI_SUPERVISED or SUPERVISED, the datasets must have a corresponding training time series
        training_type=TrainingType.UNSUPERVISED,
        input_dimensionality=InputDimensionality.MULTIVARIATE
    )]

as well as the error I have :

Running PREPARE phase
Running EVALUATION phase
Evaluating:   0%|                                                                                                                                                          | 0/1 [00:00<?, ?it/s]Exception occurred during the evaluation of lof on the dataset Dataset(datasetId=('CalIt2', 'CalIt2-traffic'), dataset_type='real', training_type=<TrainingType.UNSUPERVISED: 'unsupervised'>, length=5040, dimensions=2, contamination=0.0408730158730158, min_anomaly_length=2, median_anomaly_length=7, max_anomaly_length=19, period_size=48.0, num_anomalies=29).
Traceback (most recent call last):
  File "/home/ubuntu/anaconda3/lib/python3.8/site-packages/docker/api/client.py", line 268, in _raise_for_status
    response.raise_for_status()
  File "/home/ubuntu/anaconda3/lib/python3.8/site-packages/requests/models.py", line 943, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: http+docker://localhost/v1.41/containers/create
SebastianSchmidl commented 2 years ago

It seems that Docker cannot create the container for LOF. I suspect the following line to be the problem:

-        main=DockerAdapter(image_name="registry.gitlab.hpi.de/akita/i/lof:latest", skip_pull=True),
+        main=DockerAdapter(image_name="registry.gitlab.hpi.de/akita/i/lof", tag="latest", skip_pull=True),

This is the signature of the DockerAdapter:

def __init__(self, image_name: str, tag: str = "latest", group_privileges: str = "akita", skip_pull: bool = False,
                 timeout: Optional[Duration] = None, memory_limit_overwrite: Optional[int] = None,
                 cpu_limit_overwrite: Optional[float] = None) -> None:

It takes the image_name and tag as separate arguments. Please try my suggested change.

B-Seif commented 2 years ago

it worked for me, I was finally able to test your tool. Now I will try to create my own images with my algorithms and I will keep you informed. Thanks.

A small improvement that I hope for soon: let's say I want to have 50 datasets, I have to download them one by one and put them in 'tests/example_data' and create a multivariate or univariate folder... Do we will soon have access to these datasets without going through these steps?

Would it be possible to add other measures such as F1? and other strategies rather than the Point Wise ? for example PA and PA%K?

SebastianSchmidl commented 2 years ago

Dataset download

We grouped the datasets into multiple collections because not everybody needs all the datasets, and the whole set is huge. Each collection can contain a different number of datasets. The first table on the Datasets page shows you how many datasets are included in each collection, e.g.:

Origin Dim. Learn. # Datasets (total) Avg. Length Avg. # of Channels Avg. # of Anomalies
Collection Name
CalIt2 real multi u 1 5040 2 29
Daphnet real multi u 35 32594 9 7
Exathlon real multi u/m/s 39 47530 45 4
Genesis real multi u 1 16220 18 3
...

The downloadable ZIP-archives contain the correct folder structure, but your extraction tool might place the contained files into a new folder that is named based on the ZIP-archive-name. The idea is that you download the index-File (datasets.csv) and just the dataset collections that you require, extract them all into the same folder, place the datasets.csv there, and use Option 1 to select the correct datasets from the folder.

Example:

Szenario: You want to use the datasets from the CalIt2 and Daphnet collections.

Dataset download:

# Download CalIt2.zip, Daphnet.zip and datasets.csv
$ mkdir timeeval-datasets
$ mv datasets.csv timeeval-datasets/
$ unzip CalIt2.zip -d timeeval-datasets
$ unzip Daphnet.zip -d timeeval-datasets
$ tree timeeval-datasets
timeeval-datasets
├── datasets.csv
└── multivariate
    ├── CalIt2
    │   ├── CalIt2-traffic.metadata.json
    │   └── CalIt2-traffic.test.csv
    └── Daphnet
        ├── S01R01E0.metadata.json
        ├── S01R01E0.test.csv
        ├── S01R01E1.metadata.json
        ├── S01R01E1.test.csv
        ├── S01R02E0.metadata.json
        ├── S01R02E0.test.csv
        ├── [...]
        ├── S10R01E1.metadata.json
        └── S10R01E1.test.csv

3 directories, 77 files

TimeEval configuration:

dm = MultiDatasetManager([Path("timeeval-datasets")])
datasets = []
datasets.append(dm.select(collection="CalIt2"))
datasets.append(dm.select(collection="Daphnet"))
# ...

Quality metrics

Yes, we are currently refactoring our Metric-API and adding new metrics to TimeEval. This process is not finished yet, but some changes are already in the main-branch: https://github.com/HPI-Information-Systems/TimeEval/blob/main/timeeval/utils/metrics.py

Because every algorithm outputs a scoring (an anomaly score for each point of the original time series), we cannot use standard binary classification metrics. We would need to apply a threshold to the scores first, but choosing this threshold is use-case dependent and a task in itself. We, therefore, use threshold-agnostic measures, such as ROC_AUC or PR_AUC. In the last MR, I added Precision@K and FScore@K metrics that select the threshold so that at least K anomaly ranges are present and not more. Per default, K is the number of real anomalies (based on the ground truth).

B-Seif commented 2 years ago

Thank you for the answer.

Below are the steps I followed to make my own image. I was inspired by the baseline_random image. Suppose I have an algorithm (e.g., detectordecay) that takes 5 hyper-parameters. I took the exact same baseline.py file and added these hyper-params (is that good?). Then, If I understood correctly, I would just have to modify the execute () function to integrate my algorithm?

Another question: what are the scripts to use to test several values of my hyper-params?

alpha =[0.1,0.0.1,0.001,1]
beta  =[0.1,0.0.1,0.001,1]
omega = [1 ,2, 10]
... 

Dockerfile :

FROM registry.gitlab.hpi.de/akita/i/python3-base:latest

ENV ALGORITHM_MAIN="/app/detector_decay.py"

COPY requirements.txt /app/
RUN pip install -r /app/requirements.txt
COPY detector_decay.py /app/

Build the image image : sudo docker build -t registry.gitlab.hpi.de/akita/i/detectordecay detector_decay

detector_decay.py :

from dataclasses import dataclass
import argparse
import numpy as np
import pandas as pd
import json
import sys
@dataclass
class CustomParameters:
    random_state: int = 42
    alpha    = 0.5 
    beta     = 0.07
    omega    = 0.1
    decay    = 0.08
    dim_h    = 4

class AlgorithmArgs(argparse.Namespace):
    @property
    def ts_length(self) -> int:
        return self.df.shape[0]

    @property
    def df(self) -> pd.DataFrame:
        return pd.read_csv(self.dataInput)

    @staticmethod
    def from_sys_args() -> 'AlgorithmArgs':
        args: dict = json.loads(sys.argv[1])
        custom_parameter_keys = dir(CustomParameters())
        filtered_parameters = dict(
            filter(lambda x: x[0] in custom_parameter_keys, args.get("customParameters", {}).items()))
        args["customParameters"] = CustomParameters(**filtered_parameters)
        return AlgorithmArgs(**args)

def execute(args: AlgorithmArgs):
    np.random.seed(args.customParameters.random_state)

    anomaly_scores = np.random.uniform(args.customParameters.alpha  *  args.customParameters.beta, args.customParameters.omega * args.customParameters.dim_h, args.ts_length)
    print ("I m here  ")
    anomaly_scores.tofile(args.dataOutput, sep="\n")

if __name__ == "__main__":
    args = AlgorithmArgs.from_sys_args()

    if args.executionType == "train":
        print("This algorithm does not need to be trained!")
    elif args.executionType == "execute":
        execute(args)
    else:
        raise ValueError(f"No executionType '{args.executionType}' available! Choose either 'train' or 'execute'.")

experiments file :

#!/usr/bin/env python3
import timeeval
assert timeeval.__version__ == "1.2.4", "TimeEval version not supported. This script is for TimeEval version 1.2.4!"
from pathlib import Path
from timeeval import TimeEval, MultiDatasetManager, Metric, Algorithm, TrainingType, InputDimensionality, ResourceConstraints
from timeeval.adapters import DockerAdapter
from timeeval.params import FixedParameters
from timeeval.resource_constraints import GB

def main():
    dm = MultiDatasetManager([Path("tests/example_data")])
    datasets = dm.select() 

    algorithms = [
        Algorithm(
            name="lof",
            main=DockerAdapter(image_name="registry.gitlab.hpi.de/akita/i/lof",tag="latest", skip_pull=True),
            param_config=FixedParameters({
            "window_size": 20,
            "random_state": 42
        }),
        data_as_file=True,
        training_type=TrainingType.UNSUPERVISED,
        input_dimensionality=InputDimensionality.MULTIVARIATE
    ),
         Algorithm(
             name= "detectordecay",
            main=DockerAdapter(image_name="registry.gitlab.hpi.de/akita/i/baselinerandom",tag="latest", skip_pull=True),
            data_as_file=True,
            training_type=TrainingType.UNSUPERVISED,
            input_dimensionality=InputDimensionality.MULTIVARIATE
        )
        ]
    repetitions = 1

    rcs = ResourceConstraints.default_constraints()
    # if you want to limit the CPU or memory per algorithm, you can use:
    # rcs = ResourceConstraints(
    #     task_memory_limit = 2 * GB,
    #     task_cpu_limit = 1.0,
    # )
    timeeval = TimeEval(dm, datasets, algorithms,
        repetitions=repetitions,
        metrics=[Metric.ROC_AUC, Metric.RANGE_PR_AUC],
        resource_constraints=rcs
    )

    timeeval.run()
    results = timeeval.get_results(aggregated=True)
    print(results)

    # detailled results are automatically stored in your current working directory at ./results/<datestring>

if __name__ == "__main__":
    main()

Output :

Running PREPARE phase
Running EVALUATION phase
Evaluating: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 2/2 [00:11<00:00,  5.58s/it]
Running FINALIZE phase
FINALIZE phase done.
          Stored results at /home/ubuntu/TimeEval-test/results/2022_07_06_08_46_58/results.csv.
          Overall runtime of this TimeEval run: 11.24148964881897 seconds

                                         ROC_AUC_mean  RANGE_PR_AUC_mean  train_main_time_mean  execute_main_time_mean  repetitions
algorithm     collection dataset                                                                                                   
detectordecay CalIt2     CalIt2-traffic      0.479171           0.043113                   NaN                2.430817            1
lof           CalIt2     CalIt2-traffic      0.703758           0.109101                   NaN                5.000840            1
SebastianSchmidl commented 2 years ago

Yes, that looks good 👍🏼

You can test multiple parameters of an algorithm by using a different parameter configuration:

Checking the Cartesian product of all parameters:

from timeeval.params import FullParameterGrid

Algorithm(
    name= "detectordecay",
    main=DockerAdapter(image_name="registry.gitlab.hpi.de/akita/i/baselinerandom",tag="latest", skip_pull=True),
    param_config=FullParameterGrid({
        "alpha": [0.1,0.0.1,0.001,1],
        "beta": [0.1,0.0.1,0.001,1],
        "omega": [1 ,2, 10]
    }),
    data_as_file=True,
    training_type=TrainingType.UNSUPERVISED,
    input_dimensionality=InputDimensionality.MULTIVARIATE
)

Documentation at https://github.com/HPI-Information-Systems/TimeEval/blob/c4cc594ba5f3705b7c2eb868cccff43209a66a87/timeeval/params/search.py#L43-L70

Checking the search space of each parameter individually:

from timeeval.params import IndependentParameterGrid

Algorithm(
    name= "detectordecay",
    main=DockerAdapter(image_name="registry.gitlab.hpi.de/akita/i/baselinerandom",tag="latest", skip_pull=True),
    param_config=IndependentParameterGrid({
        "alpha": [0.1,0.0.1,0.001,1],
        "beta": [0.1,0.0.1,0.001,1],
        "omega": [1 ,2, 10]
    }, default_params: {"alpha": 0.1, "beta": 0.1, "omega": 1}),
    data_as_file=True,
    training_type=TrainingType.UNSUPERVISED,
    input_dimensionality=InputDimensionality.MULTIVARIATE
)

Documentation at https://github.com/HPI-Information-Systems/TimeEval/blob/main/timeeval/params/search.py#L87

Any advanced parameter search algorithms, such as bayesian optimization, are not supported by TimeEval. At the moment, you have to use other tools for that.

B-Seif commented 2 years ago

Thanks. I will run these experiments on cluster, is it distributed by default ? or do I need to do something else besides TimeEval ? EDIT
After exploring the code, I find the constants.py that contains ip adresses of your machines. Say, I have 3 machines, Should I update only this file with their ip adresses ? should I install timeeval and docker on these 3 machines ?

TimeEval call :

timeeval = TimeEval(dm, datasets, algorithms,
        repetitions=repetitions,distributed = True,
        metrics=[Metric.ROC_AUC, Metric.RANGE_PR_AUC],
        resource_constraints=rcs
    )

Output :

File "/home/ubuntu/anaconda3/lib/python3.8/site-packages/asyncssh/connection.py", line 2710, in validate_server_host_key
    raise HostKeyNotVerifiable(str(exc)) from None
asyncssh.misc.HostKeyNotVerifiable: Host key is not trusted
SebastianSchmidl commented 2 years ago

No, it's not distributed by default, and unfortunately the distributed setup of TimeEval is still a bit cumbersome. I quickly describe the overall setup, before I come to the details:

Distributed TimeEval

TimeEval uses Dask's SSHCluster to distribute tasks on a compute cluster. According to Dask's terminology, we also distinguish between a scheduler host (leader) and worker hosts (follower). In addition, we also have a driver host.

Host Role Description
driver Machine that runs the experiment script (where you call python experiment-script.py).
scheduler Machine that runs the Dask Scheduler that coordinates worker processes and distributes the tasks and jobs to the workers.
worker Machine that runs a Dask Worker and receives tasks and jobs. The workers perform the actual computations.

The driver could be your local notebook or computer, while the scheduler and worker hosts are part of the cluster. For our experiments, we usually use a single machine as driver, scheduler, and worker; while all the other machines just get the worker role. That's typically not a problem because the driver and scheduler components don't use many resources, and we can, therefore, use the resources much more efficiently.

If TimeEval is started with distributed=True, it automatically starts a Dask SSHCluster on the specified scheduler and worker hosts. This is done via simple SSH-connections to the machines. It then uses the passed experiment configurations to create evaluation jobs (called Experiments). Each Experiment consists of an algorithm, its hyperparameters, a dataset, and a repetition number. After all Experiments have been generated and validated, they are sent to the Dask scheduler and put into a task queue. The workers pull the tasks from the scheduler and perform the evaluation (executing the Docker containers of the algorithm). All results and temporary data are stored on the disk of the local node and the overall evaluation result is sent back to the scheduler. The driver host periodically polls the scheduler for the results and collects them in memory. When all tasks have been processed, the driver uses SSH again to pull all the temporary data from the worker nodes. This populates the local results-folder.

Cluster Requirements

Please ensure that your cluster setup meets the following requirements:

TimeEval configuration for distributed execution

You can setup the cluster for TimeEval using the RemoteConfiguration-class:

from timeeval import TimeEval, RemoteConfiguration, ResourceConstraints
from timeeval.resource_constraints import GB

# cluster setup
cluster_config = RemoteConfiguration(
    scheduler_host="hostname-1",
    worker_hosts=["hostname-1", "hostname-2", "hostname-3"]
)

# per default TimeEval deploys only a single task per host, you can change that with:
limits = ResourceConstraints(
    tasks_per_host=16,  # <-- for 32 cores and 64GB RAM, each task gets 2 CPUs and ~4GB RAM
# you can fix the CPU und memory limits independently from the number of tasks, but make sure that you don't overprovision!
#    task_cpu_limit=1.0,
#    task_memory_limit=2 * GB
)

timeeval = TimeEval(dm, datasts, algorithms,
    distributed=True,
    remote_config=cluster_config,
    resource_constraints=limits,
    ...
)
B-Seif commented 2 years ago

I am going to use baseline.py script as a basis to implement my algorithm which is unsupervised and for multivariate time. series. Only, I would like to know if it's possible to test the script without going through docker to fix bugs quickly. Do you have an example script that I can use? Assume this is my algo.py

import argparse
import numpy as np
import pandas as pd
import json
import sys

class AlgorithmArgs(argparse.Namespace):
    @property
    def ts_length(self) -> int:
        return self.df.shape[0]

    @property
    def df(self) -> pd.DataFrame:
        return pd.read_csv(self.dataInput)

    @staticmethod
    def from_sys_args() -> 'AlgorithmArgs':
        args: dict = json.loads(sys.argv[1])
        return AlgorithmArgs(**args)

def execute(args: AlgorithmArgs):
    #anomaly_scores = np.zeros(args.ts_length)
    # my algorithm take time series matrix X , is this the obtained by calling df ()?
     anomaly_scores = CALL MY ALGORITHM( df ??)
    anomaly_scores.tofile(args.dataOutput, sep="\n")

if __name__ == "__main__":
    args = AlgorithmArgs.from_sys_args()

    if args.executionType == "train":
        print("This algorithm does not need to be trained!")
    elif args.executionType == "execute":
        execute(args)
    else:
        raise ValueError(f"No executionType '{args.executionType}' available! Choose either 'train' or 'execute'.")
B-Seif commented 2 years ago

About FullParameterGrid :

I have some hyper-parameters that I declared with default values in algorithm.py However, I would like to test several values hence the use of FullParameterGridfunction as mentioned above; but it does not work ! at the runtime it can't reconnect my hyper-parameters as you can see below.

#### Docker container logs ####
Traceback (most recent call last):
  File "/app/detector_decay.py", line 57, in <module>
    args = AlgorithmArgs.from_sys_args()
  File "/app/detector_decay.py", line 30, in from_sys_args
    args["customParameters"] = CustomParameters(**filtered_parameters)
TypeError: __init__() got an unexpected keyword argument 'beta'

###############################

Docker algorithm failed with status code '1', consider container logs below.

Here the scripts : algorithm.py

from dataclasses import dataclass
import argparse
import numpy as np
import pandas as pd
import json
import sys
from detector import *

@dataclass
class CustomParameters:
    random_state: int = 42 
    lamda    = 0.5 
    beta     = 0.07
    convex   = True
    maxIt    = 10
    omega    = 0.1
    decay    = 0.08
    dim_h    = 1

class AlgorithmArgs(argparse.Namespace):

    @staticmethod
    def from_sys_args() -> 'AlgorithmArgs':
        args: dict = json.loads(sys.argv[1])
        custom_parameter_keys = dir(CustomParameters())
        filtered_parameters = dict(
            filter(lambda x: x[0] in custom_parameter_keys, args.get("customParameters", {}).items()))
        args["customParameters"] = CustomParameters(**filtered_parameters)
        return AlgorithmArgs(**args)

def load_data(config: AlgorithmArgs) -> np.ndarray:
    df = pd.read_csv(config.dataInput)
    return df.iloc[:, 1:-1].values  

def execute(args: AlgorithmArgs):
    # get hyper-parameters
    dim_h  = args.customParameters.dim_h
    lamda  = args.customParameters.lamda
    omega  = args.customParameters.omega
    beta   = args.customParameters.beta
    convex = args.customParameters.convex
    maxIt  = args.customParameters.maxIt
    decay  = args.customParameters.decay
    # get data
    X = load_data(args)
    # build temporal dependecy
    Z = buildZ(X,dim_h,decay,omega)['Z']
    # run detector
    results        =  detector(X,Z,lamda=lamda,beta=beta,convex=convex,maxIt=maxIt)
    # output anomaly scores
    anomaly_scores =  np.apply_along_axis(np.linalg.norm,1, results['B'].T)
    anomaly_scores.tofile(args.dataOutput, sep="\n")

if __name__ == "__main__":
    args = AlgorithmArgs.from_sys_args()

    if args.executionType == "train":
        print("This algorithm does not need to be trained!")
    elif args.executionType == "execute":
        execute(args)
    else:
        raise ValueError(f"No executionType '{args.executionType}' available! Choose either 'train' or 'execute'.")

and below the script of evaluation :

evaluation.py

#!/usr/bin/env python3
import timeeval
assert timeeval.__version__ == "1.2.4", "TimeEval version not supported. This script is for TimeEval version 1.2.4!"
from pathlib import Path
from timeeval import TimeEval, MultiDatasetManager, Metric, Algorithm, TrainingType, InputDimensionality, ResourceConstraints
from timeeval.adapters import DockerAdapter
from timeeval.params import FixedParameters
from timeeval.resource_constraints import GB
from timeeval.params import FullParameterGrid

def main():
    dm = MultiDatasetManager([Path("tests/example_data")])
    datasets = dm.select() 
    algorithms = [
         Algorithm(
            name= "detectordecay",
            main=DockerAdapter(image_name="registry.gitlab.hpi.de/akita/i/detector",tag="latest", skip_pull=True),
            data_as_file=True,

            param_config=FullParameterGrid({

                "lamda" : [0.1,0.01],
                "beta" :  [0.01,0.001],
                "decay" : [0.8,0.07]

                }),
            training_type=TrainingType.UNSUPERVISED,
            input_dimensionality=InputDimensionality.MULTIVARIATE
        )
        ]
    repetitions = 1

    rcs = ResourceConstraints.default_constraints()
    # if you want to limit the CPU or memory per algorithm, you can use:
    # rcs = ResourceConstraints(
    #     task_memory_limit = 2 * GB,
    #     task_cpu_limit = 1.0,
    # )
    timeeval = TimeEval(dm, datasets, algorithms,
        repetitions=repetitions,distributed = False,
        metrics=[Metric.ROC_AUC, Metric.RANGE_PR_AUC],
        resource_constraints=rcs
    )

    timeeval.run()
    results = timeeval.get_results(aggregated=True)
    print(results)

if __name__ == "__main__":
    main()
SebastianSchmidl commented 2 years ago

I have some hyper-parameters that I declared with default values in algorithm.py However, I would like to test several values hence the use of FullParameterGrid function as mentioned above; but it does not work ! at the runtime it can't reconnect my hyper-parameters as you can see below.

If you use our custom parameter parsing function from_sys_args, then you need to specify the types of all parameters in the dataclass CustomParameters:

@dataclass
class CustomParameters:
    random_state: int = 42
    lamda: float      = 0.5
    beta: float       = 0.07
    convex: bool      = True
    maxIt: int        = 10
    omega: float      = 0.1
    decay: float      = 0.08
    dim_h: int        = 1
SebastianSchmidl commented 2 years ago

I am going to use baseline.py script as a basis to implement my algorithm which is unsupervised and for multivariate time. series. Only, I would like to know if it's possible to test the script without going through docker to fix bugs quickly. Do you have an example script that I can use?

I don't see your problem here. If you have implemented your algorithm and created the algorithm.py or algo.py script as you have shown above, you can just use the following call to test your script (make sure that you have installed all your algorithms' dependencies in the current environment):

python algorithm.py '{ "executionType": "execute", "dataInput": "tests/example_data/path-to-dataset.csv", "dataOutput": "scores.csv", "customParameters": { "beta": 1, "dim_h": 2 } }'
B-Seif commented 2 years ago

Thanks it works for me.

Is it possible to give random values of these hyper params at each execution ?

        lamda = math.pow(10,np.random.randint(-3,6) )   * np.random.random()
        beta    = math.pow(10,np.random.randint(-3,6) )   * np.random.random()
        decay  = math.pow(10,np.random.randint(-3,6) )   * np.random.random()
        w         =  math.pow(10,np.random.randint(-3,6) )   * np.random.random()

Another question: I have a hyper-param, dim_h, which takes random values depending on the dimension of the time series, would it be possible to retrieve the dimension of the series when declaring the algorithms in. eval.py ?

     dim_h =np.random.randint(1,dimension_TS)
SebastianSchmidl commented 2 years ago

Is it possible to give random values of these hyper params at each execution ?

No, but you can generate a list of random values before submitting it to TimeEval:

def rand():
    return math.pow(10, np.random.randint(-3, 6)) * np.random.random()

FullParameterGrid({"beta": [rand() for _ in np.arange(10)], ...})

Another question: I have a hyper-param, dim_h, which takes random values depending on the dimension of the time series, would it be possible to retrieve the dimension of the series when declaring the algorithms in. eval.py ?

If dim_h just depends on the number of channels of the time series, there is no need to expose it as an hyperparameter. You can just compute the value in the execute()-function of your algorithm:

def execute(args: AlgorithmArgs):
    # get hyper-parameters
#    dim_h  = args.customParameters.dim_h
    lamda  = args.customParameters.lamda
    omega  = args.customParameters.omega
    beta   = args.customParameters.beta
    convex = args.customParameters.convex
    maxIt  = args.customParameters.maxIt
    decay  = args.customParameters.decay
    # get data
    X = load_data(args)
    dim_h = np.random.randint(1, X.shape[1])

    ...
B-Seif commented 2 years ago

If dim_h just depends on the number of channels of the time series, there is no need to expose it as an hyperparameter. You can just compute the value in the execute()-function of your algorithm:

Yes I can do that. But I need to save (as for the other hyper-params) the value that was taken by dim_h in order to reproduce the results.

SebastianSchmidl commented 2 years ago

That's what seeding is for, e.g.: image

That is also the reason all our algorithms have a random_state-parameter. It's used to seed the random number generators to ensure reproducible results.

B-Seif commented 2 years ago

Ok I see, thank you.

Now that everything works perfectly, I have one last question: have you stored, somewhere, the performance of each algorithm on each dataset? or do I have to repeat the experiments for all the algorithms? In your paper, you give the average performance on all datasets, but I would like to have access to the performance of each algorithm for each dataset.

SebastianSchmidl commented 2 years ago

I'm delighted to hear that you can use TimeEval successfully!

On our paper supporting website, we list all the information and data, including a link to the evaluation results (quality and runtime measurements). image

As you can see, we did two experiments: the first one on synthetically generated datasets and the second one on the "benchmark" datasets from the internet and literature (combined results). The result downloads do not include the algorithm scorings (anomaly scores), but just the quality and runtime measures.