uccross / skyhookdm-pythonclient

Python client for SkyhookDM for IRIS-HEP project.
0 stars 0 forks source link

Setup SkyhookDM Driver (suppose that you have the Skyhook Cloudlab expeirment running):

Note: Cloning this repo is not necessary. Runing the scripts is enough. The scripts use 'pip' to install skyhookdmdriver and other dependencies which can't be installed via 'pip' commands.

=========================== SkyhookDM Python Client Architecture ===========================

The Cient is containerized:

Run in Docker

Assuming we have the following folder structure:

myproject/
└── example.py

We execute the example by doing the following:

cd myproject/

docker run --rm -ti -v $PWD:/ws -w /ws uccross/skyhookdm-py /ws/myapp.py

The above mounts the myproject/ folder in a /ws (workspace) folder inside the container. It then invokes the example.py file that we need to write ourselves. Take a look at the examples/ folder for examples of how to write applications that use the Skyhook python client library.

Run in Kubernetes

The following executes the test:

apiVersion: v1
kind: Pod
metadata:
  name: mypod
spec:
  containers:
  - name: skyhook-client-container
    image: uccross/skyhookdm-py
    args: ['/path/to/script.py']

In the above the /path/to/script.py needs to be updated to the proper script that must already exist inside the image being referenced, so you could probably build an image on top of the one we provide with your scripts. For example:

FROM uccross/skyhookdm-py

COPY myscript.py /

And reference the above myscript.py file in the Pod definition file instead of /path/to/script.py.

========================= More Details about SkyhookDM Python APIs ========================= skyhook_common:

This module defines the classes such as Dataset, File, RootNode which should be understood by both the modules of SkyhookDM and Skyhook_driver.

Classes:

skyhook_driver:

Skyhook_driver handles the heavy workloads such as writing the dataset to the ceph cluster and partition the root files to smaller Arrow tables. Some complicated queries can also be handled by Skyhook_driver. Skyhook_driver can manage multiple workers, schedule tasks and balance the workload between workers. Workers can be added and removed based on the volume of the workload.

skyhook:

For now this module only contains one class which is SkyhookDM. This module and Skyhook_common should be installed by the client. SkyhookDM connects to the Skyhook_driver and submit tasks to Skyhook_driver. Some lightweight queries are also handled by SkyhookDM.

Classes:

Usage Examples: Example to write a dataset to skyhook

==========================

# import the lib

from skyhookdmclient import SkyhookDM

# create a new SkyhookDM object

sk = SkyhookDM()

# connect to Skyhook_driver and Ceph data pool, please replace the ip_address and the pool name. 

sk.connect('ip_address','ceph_pool_name')

# give the Root file urls

urls = ['[http://uaf-1.t2.ucsd.edu/jeff_data/files/0C1BA5F0-9253-F24C-BBBA-E78510BC4D8E.root](http://uaf-1.t2.ucsd.edu/jeff_data/files/0C1BA5F0-9253-F24C-BBBA-E78510BC4D8E.root)',

'[http://uaf-1.t2.ucsd.edu/jeff_data/files/0E7B346F-89B6-7D4D-8AE0-B1014D09A1BB.root](http://uaf-1.t2.ucsd.edu/jeff_data/files/0E7B346F-89B6-7D4D-8AE0-B1014D09A1BB.root)',

'[http://uaf-1.t2.ucsd.edu/jeff_data/files/0FAE126E-B6B9-134D-B732-53EE0A156903.root](http://uaf-1.t2.ucsd.edu/jeff_data/files/0FAE126E-B6B9-134D-B732-53EE0A156903.root)',

'[http://uaf-1.t2.ucsd.edu/jeff_data/files/2B00A7C5-908D-3946-9D72-0CB13979BBEF.root](http://uaf-1.t2.ucsd.edu/jeff_data/files/2B00A7C5-908D-3946-9D72-0CB13979BBEF.root)'

,'[http://uaf-1.t2.ucsd.edu/jeff_data/files/8FE19B0A-496C-9740-8600-E8EA265D4859.root](http://uaf-1.t2.ucsd.edu/jeff_data/files/8FE19B0A-496C-9740-8600-E8EA265D4859.root)',

'[http://uaf-1.t2.ucsd.edu/jeff_data/files/24C5C749-BC61-1746-8CEE-69D54B33A1D3.root](http://uaf-1.t2.ucsd.edu/jeff_data/files/24C5C749-BC61-1746-8CEE-69D54B33A1D3.root)',

'[http://uaf-1.t2.ucsd.edu/jeff_data/files/A8B4FDAE-DEEB-D24E-8DD2-91EB6E2D9CF7.root](http://uaf-1.t2.ucsd.edu/jeff_data/files/A8B4FDAE-DEEB-D24E-8DD2-91EB6E2D9CF7.root)',

'[http://uaf-1.t2.ucsd.edu/jeff_data/files/B91E7F75-43E3-C246-B0F8-5C8D2216E299.root](http://uaf-1.t2.ucsd.edu/jeff_data/files/B91E7F75-43E3-C246-B0F8-5C8D2216E299.root)',

'[http://uaf-1.t2.ucsd.edu/jeff_data/files/C6E7959A-DB51-BC4C-BCDC-68CD694B919E.root](http://uaf-1.t2.ucsd.edu/jeff_data/files/C6E7959A-DB51-BC4C-BCDC-68CD694B919E.root)',

'[http://uaf-1.t2.ucsd.edu/jeff_data/files/E66BCF3F-B328-B244-AC53-788C07E02454.root](http://uaf-1.t2.ucsd.edu/jeff_data/files/E66BCF3F-B328-B244-AC53-788C07E02454.root)',

'[http://uaf-1.t2.ucsd.edu/jeff_data/files/F47D0822-3436-004E-926A-0A294C09AB0D.root](http://uaf-1.t2.ucsd.edu/jeff_data/files/F47D0822-3436-004E-926A-0A294C09AB0D.root)'

]

# write the dataset to Ceph cluster and name the dataset ‘demodst’

sk.writeDataset(urls,'demodst')

==========================

# import the lib

from skyhookdmclient import SkyhookDM

# create a new SkyhookDM object

sk = SkyhookDM()

# connect to Skyhook_driver and Ceph data pool, please replace the ip_address and the pool name. 

sk.connect('ip_address','ceph_pool_name')

# get the dataset named ‘aod’

dst = sk.getDataset('aod')

# get the first file in the dataset

f = dst.getFiles()[0]

# run the query on the first file, queries only one branch

table = sk.runQuery(f,'select event_id>5, project Events;75.Muon_phi')

# run the query on the dataset, queries multiple branches.

tables = sk.runQuery(dst,'select event_id>5, project Events;75.Muon_eta,Events;75.Muon_phi,Events;75.Muon_mass')

Example write dataset

==========================

# import the lib

from skyhookdmclient import SkyhookDM
import pyarrow as pa
import pandas as pd

# create a new SkyhookDM object

sk = SkyhookDM()

# connect to Skyhook_driver and Ceph data pool, please replace the ip_address and the pool name. 

sk.connect('ip_address','ceph_pool_name')

# json values which defines the dataset schema.
my_json = '''{
    "did": "222:wfe.361106.PowhegPythia8EvtGen_AZNLOCTEQ6L1_Zee.merge.DAOD",
    "selection": "(call ResultTTree (call Select (call Select (call EventDataset (list 'localds:bogus')) (lambda (list e) (list (call (attr e 'Electrons') 'Electrons') (call (attr e 'Muons') 'Muons')))) (lambda (list e) (list (call (attr (subscript e 0) 'Select') (lambda (list ele) (call (attr ele 'e')))) (call (attr (subscript e 0) 'Select') (lambda (list ele) (call (attr ele 'pt')))) (call (attr (subscript e 0) 'Select') (lambda (list ele) (call (attr ele 'phi')))) (call (attr (subscript e 0) 'Select') (lambda (list ele) (call (attr ele 'eta')))) (call (attr (subscript e 1) 'Select') (lambda (list mu) (call (attr mu 'e')))) (call (attr (subscript e 1) 'Select') (lambda (list mu) (call (attr mu 'pt')))) (call (attr (subscript e 1) 'Select') (lambda (list mu) (call (attr mu 'phi')))) (call (attr (subscript e 1) 'Select') (lambda (list mu) (call (attr mu 'eta'))))))) (list 'e_E' 'e_pt' 'e_phi' 'e_eta' 'mu_E' 'mu_pt' 'mu_phi' 'mu_eta') 'forkme' 'dude.root')",
    "image": "sslhep/servicex",
    "result-destination": "kafka",
    "kafka": {
        "broker": "kafka-inc:0000"
    },
    "chunk-size": 1000,
    "workers": 17
}'''

# here we use the "did" field as the dataset name.
dataset_name = "222:wfe.361106.PowhegPythia8EvtGen_AZNLOCTEQ6L1_Zee.merge.DAOD"

# add the json data to SkyhookDM. The name of the object will be defined by the 'name' or 'did' (for 'servicex') value in the json.
sk.addDatasetSchema(my_json)

# Create an example panda dataframe (represents the columns at the end of the JSON selection statement above).
df = pd.DataFrame( {"e_E": [1, 2, 3], "e_pt": [4, 5, 6], "e_phi": [7, 8, 9], "e_eta": [10, 11, 12], "mu_E": [13, 14, 15], "mu_pt": [16, 17, 18], "mu_phi": [19, 20, 21],  "mu_eta": [22, 23, 24]} )

# Convert the panda dataframe to arrow table. 
table = pa.Table.from_pandas(df)

# Write the arrow table to the SkyhookDM. 
objs = sk.writeArrowTable(table, dataset_name)

# Print the object names written to SkyhookDM.
print(objs)