ssl-hep / ServiceX_frontend

Client access library for ServiceX
5 stars 11 forks source link

ServiceX Client Library

Client access library for ServiceX

GitHub Actions Status Code Coverage

PyPI version Supported Python versions

Introduction

Given you have a selection string, this library will manage submitting it to a ServiceX instance and retrieving the data locally for you. The selection string is often generated by another front-end library, for example:

Prerequisites

Before you can use this library you'll need:

How to access your endpoint

The API access information is normally placed in a configuration file (see the section below). Create a config file, servicex.yaml, in the yaml format, in the appropriate place for your work that contains the following (for the xaod backend; use uproot for the type for the uproot backend):

api_endpoints:
  - name: <your-endpoint-name>
    endpoint: <your-endpoint>
    token: <api-token>
    type: xaod

All strings are expanded using python's os.path.expand method - so $NAME and ${NAME} will work to expand existing environment variables.

You can list multiple end points by repeating the block of dictionary items, but using a different name.

Finally, you can create the objects ServiceXAdaptor and MinioAdaptor by hand in your code, passing them as arguments to ServiceXDataset and inject custom endpoints and credentials, avoiding the configuration system. This is probably only useful for advanced users.

These config files are used to keep confidential credential information - so that it isn't accidentally placed in a public repository.

If no endpoint is specified or config file containing a useful endpoint is found, then the library defaults to the developer endpoint, which is http://localhost:5000 for the web-service API. No passwords are used in this case.

Usage

The following lines will return a pandas.DataFrame containing all the jet pT's from an ATLAS xAOD file containing Z->ee Monte Carlo:

    from servicex import ServiceXDataset
    query = "(call ResultTTree (call Select (call SelectMany (call EventDataset (list 'localds:bogus')) (lambda (list e) (call (attr e 'Jets') 'AntiKt4EMTopoJets'))) (lambda (list j) (/ (call (attr j 'pt')) 1000.0))) (list 'JetPt') 'analysis' 'junk.root')"
    dataset = "mc15_13TeV:mc15_13TeV.361106.PowhegPythia8EvtGen_AZNLOCTEQ6L1_Zee.merge.DAOD_STDM3.e3601_s2576_s2132_r6630_r6264_p2363_tid05630052_00"
    ds = ServiceXDataset(dataset, backend_name=`xaod`)
    r = ds.get_data_pandas_df(query)
    print(r)

And the output in a terminal window from running the above script (takes about 1-2 minutes to complete):

python scripts/run_test.py http://localhost:5000/servicex
            JetPt
entry
0       38.065707
1       31.967096
2        7.881337
3        6.669581
4        5.624053
...           ...
710183  42.926141
710184  30.815709
710185   6.348002
710186   5.472711
710187   5.212714

[11355980 rows x 1 columns]

If your query is badly formed or there is an other problem with the backend, an exception will be thrown with information about the error.

If you'd like to be able to submit multiple queries and have them run on the ServiceX back end in parallel, it is best to use the asyncio interface, which has the identical signature, but is called get_data_pandas_df_async.

For documentation of get_data and get_data_async see the servicex.py source file.

The backend_name tells the library where to look in the servicex.yaml configuration file to find an end point (url and authentication information). See above for more information.

How to specify the input data

How you specify the input data, and what data can be ingested, is ultimately defined by the configuration of the ServiceX backend you are running against. This servicex library supports the following:

The Local Data Cache

To speed things up - especially when you run the same query multiple times, the servicex package will cache queries data that comes back from Servicex. You can control where this is stored with the cache_path in the configuration file (see below). By default it is written in the temp directory of your system, under a servicex_{USER} directory. The cache is unbound: it will continuously fill up. You can delete it at any time that you aren't processing data: data will be re-downloaded or re-transformed in ServiceX.

There are times when you want the system to ignore the cache when it is running. You can do this by using ignore_cache():

from servicex import ignore_cache

with ignore_cache():
  do_query():

If you are using a Jupyter notebook, the with statement can't really span cells. So use ignore_cache().__enter__() instead. Or you can do something like:

from servicex import ignore_cache

ic = ignore_cache()
ic.__enter__()

...

ic.__exit__(None, None, None)

If you wish to disable the cache for a single dataset, use the ignore_cache parameter when you create it:

ds = ServiceXDataset(dataset, ignore_cache=True)

Finally, you can ignore the cache for a dataset for a short period of time by using the same context manager pattern:

ds = ServiceXData(dataset)
with ds.ignore_cache():
  do_query(ds)  # Cache is ignored
do_query(ds)  # Cache is not ignored

Analysis And Query Cache

The servicex library can write out a local file which will map queries to backend request-id's. This file can then be used on other people, checked into repositories, etc., to reference the same data in the backend. The advantage is that the backend does not need to re-run the query - the servicex library need only download it again. When a user uses multiple machines or shares analysis code with an analysis team, this is a much more efficient use of resources.

The cache search order is as follows:

Note: Eventually the backends will contain automatic cache lookup and this feature will be much less useful as it will occur automatically, on the backend.

Deleting Files from the local Data Cache

It is not recommended to alter the cache. The software expects the cache to be in a certain state, and randomly altering it can lead to unexpected behavior.

Besides telling the servicex library to ignore the cache in the above ways, you can also delete files from the local cache. The local cache directory is split up into sub-directories. Deleting files from each of the directories:

Configuration

The servicex library searches for configuration information in several locations to determine what end-point it should connect to:

  1. The config file can be called servicex.yaml, servicex.yml, or .servicex. The files are searched in that order, and all present are used.
  2. A config file in the current working directory.
  3. A config file in any working directory above your current working directory.
  4. A config file in the user's home directory ($HOME on Linux and Mac, and your profile directory on Windows).
  5. The config_defaults.yaml file distributed with the servicex package.

The file can contain an api_endpoint as mentioned earlier. In addition the other following things can be put in:

All strings are expanded using python's os.path.expand method - so $NAME and ${NAME} will work to expand existing environment variables.

For non-standard use cases, the user can specify:

Features

Implemented:

Testing

This code has been tested in several environments:

Non-standard backends

When doing backend development, often ports 9000 and 5000 are forwarded to the local machine exposing the minio and ServiceX_App instances. In that case, you'll need to create a configuration file that has http://localhost:5000 as the end point. No API token is necessary if the development ServiceX instance doesn't have authorization turned on.

API

Everything is based around the ServiceXDataset object. Below is the documentation for the most common parameters.

  ServiceXDataset(dataset: str,
                 backend_name: Optional[str] = None,
                 image: str = 'sslhep/servicex_func_adl_xaod_transformer:v0.4',
                 max_workers: int = 20,
                 result_destination = 'object-store',
                 servicex_adaptor: ServiceXAdaptor = None,
                 minio_adaptor: MinioAdaptor = None,
                 cache_adaptor: Optional[Cache] = None,
                 status_callback_factory: Optional[StatusUpdateFactory] = _run_default_wrapper,
                 local_log: log_adaptor = None,
                 session_generator: Callable[[], Awaitable[aiohttp.ClientSession]] = None,
                 config_adaptor: ConfigView = None):
  '''
      Create and configure a ServiceX object for a dataset.

      Arguments

          dataset                     Name of a dataset from which queries will be selected.
          backend_name                The type of backend. Used only if we need to find an
                                      end-point. If we do not have a `servicex_adaptor` then this
                                      will default to xaod, unless you have any endpoint listed
                                      in your servicex file. It will default to best match there,
                                      in that case.
          image                       Name of transformer image to use to transform the data
          max_workers                 Maximum number of transformers to run simultaneously on
                                      ServiceX.
          result_destination          Where the transformers should write the results.
                                      Defaults to object-store, but could be used to save
                                      results to a posix volume                                      
          servicex_adaptor            Object to control communication with the servicex instance
                                      at a particular ip address with certain login credentials.
                                      Default comes from the `config_adaptor`.
          minio_adaptor               Object to control communication with the minio servicex
                                      instance.
          cache_adaptor               Runs the caching for data and queries that are sent up and
                                      down.
          status_callback_factory     Factory to create a status notification callback for each
                                      query. One is created per query.
          local_log                   Log adaptor for logging.
          session_generator           If you want to control the `ClientSession` object that
                                      is used for callbacks. Otherwise a single one for all
                                      `servicex` queries is used.
          config_adaptor              Control how configuration options are read from the
                                      configuration file (servicex.yaml, servicex.yml, .servicex).

      Notes:

          -  The `status_callback` argument, by default, uses the `tqdm` library to render
             progress bars in a terminal window or a graphic in a Jupyter notebook (with proper
             jupyter extensions installed). If `status_callback` is specified as None, no
             updates will be rendered. A custom callback function can also be specified which
             takes `(total_files, transformed, downloaded, skipped)` as an argument. The
             `total_files` parameter may be `None` until the system knows how many files need to
             be processed (and some files can even be completed before that is known).
  '''

To get the data use one of the get_data method. They all have the same API, differing only by what they return.

 |  get_data_awkward_async(self, selection_query: str, title: Optional[str] = None) -> Dict[bytes, Union[awkward.array.jagged.JaggedArray, numpy.ndarray]]
 |      Fetch query data from ServiceX matching `selection_query` and return it as
 |      dictionary of awkward arrays, an entry for each column. The data is uniquely
 |      ordered (the same query will always return the same order). If specified, the optional title is passed to the backend and can be viewed on the status page.
 |
 |  get_data_awkward(self, selection_query: str, title: Optional[str] = None) -> Dict[bytes, Union[awkward.array.jagged.JaggedArray, numpy.ndarray]]
 |      Fetch query data from ServiceX matching `selection_query` and return it as
 |      dictionary of awkward arrays, an entry for each column. The data is uniquely
 |      ordered (the same query will always return the same order).  If specified, the optional title is passed to the backend and can be viewed on the status page.

Each data type comes in a pair - an async version and a synchronous version.

Streaming Results

The ServiceX backend generates results file-by-file. The above API will return the list of files when the transform has completed. For large transforms this can take some time: no need to wait until it is completely done before processing the files!

The StreamInfoURL contains a bucket, file, and a url property. The url property can be used to access the requested data without authentication for about 24 hours (depends on the ServiceX backend's configuration). Use the file to understand what part of the starting dataset that data came from. And as this de-facto points to a minio database currently, the bucket can be used to find the host bucket name.

The StreamInfoData contains a file and a path property. The file is as above, and the path is a pathlib.Path object that points to the file that has been downloaded into the cache locally.

An example using the async interface that performs the same operation as the initial example above:

    from servicex import ServiceXDataset
    query = "(call ResultTTree (call Select (call SelectMany (call EventDataset (list 'localds:bogus')) (lambda (list e) (call (attr e 'Jets') 'AntiKt4EMTopoJets'))) (lambda (list j) (/ (call (attr j 'pt')) 1000.0))) (list 'JetPt') 'analysis' 'junk.root')"
    dataset = "mc15_13TeV:mc15_13TeV.361106.PowhegPythia8EvtGen_AZNLOCTEQ6L1_Zee.merge.DAOD_STDM3.e3601_s2576_s2132_r6630_r6264_p2363_tid05630052_00"
    ds = ServiceXDataset(dataset)

    async for f in ds.get_data_rootfiles_stream(query):
      print(f.path)

Notes:

Development

For any changes please feel free to submit pull requests! We are using the gitlab workflow: the master branch represents the latests updates that pass all tests working towards the next version of the software. Any PR's should be based off the most recent version of master if they are for new features. Each release is frozen on a dedicated release branch, e.g. v2.0.0. If a bug fix needs to be applied to an existing release, submit a PR to master mentioning the affected version(s). After the PR is merged to master, it will be applied to the relevant release branch(es) using git cherry-pick.

To do development please setup your environment with the following steps:

  1. A python 3.7 development environment
  2. Fork/Pull down this package, XX
  3. python -m pip install -e .[test]
  4. Run the tests to make sure everything is good: pytest.

Then add tests as you develop. When you are done, submit a pull request with any required changes to the documentation and the online tests will run.

To create a release branch

get checkout 2.0.0
get switch -c v2.0.0
git push