Closed brianventura closed 4 years ago
Hi Brian!
this works for ROOT.RDataFrames, but not for PyRDF.RDataFrames, which seems logic but not a good news to me.
Our main goal is that everything that works for ROOT.RDataFrame should also work from PyRDF, so it may be a bug.
Do you have any minimal reproducer?
Yes, here is one. The first lines until PyRDF.use("spark") are for setupping the module... It works in local but ot in spark.
import ROOT, sys
#sys.path.insert(0,'/eos/home-v/vbrian/SWAN_projects/dvcs_project/Tools/Modules/PyRDF')
#sys.path.append('/eos/home-v/vbrian/SWAN_projects/dvcs_project/Tools/Modules/PyRDF')
#print(sys.path)
import PyRDF
print
#print(PyRDF.__file__)
#sc.addPyFile("Tools/Modules/PyRDF.zip")
PyRDF.use('spark')
#spark
r = PyRDF.RDataFrame(10)
ROOT.gInterpreter.Declare('''
double hello = 3.;
''')
print(r.Define("plop", "hello").Sum("plop").GetValue())
Bests, Brian
I see, so the issue is the following (jump to the solution below if you want to skip the explanation):
ROOT.gInterpreter.Declare(...)
). However, this only runs locally and the definition itself it is not part of the computation graph that RDataFrame defines underneath. hello
variable is not known on the workers, because it is never declared in their environments (we only execute operations that are part of the RDataFrame graph - Define
, Sum
, Histo1D
... - because somehow we need to know what the user is running and then send it to the workers.)Solution
Ah! Fortunately, this time we anticipated this corner case and we added a method to the PyRDF API (PyRDF.initialize
) to run operations on the workers nodes, even if these operations are not part of the RDataFrame computation graph or just to initialize the environment on the workers nodes, for example defining your hello
variable. Bear in mind that everything passed to the PyRDF.initialize
method will be run before any other computation and only once.
Here you have two examples to solve your reproducer (I just tried both on swan.cern.ch):
import PyRDF
PyRDF.use('spark')
r = PyRDF.RDataFrame(10)
# This function will be run on every remote worker
def declare_var():
# Since the function itself will be pickle'd, we need to do a local import
# otherwise pickle complains about it.
import ROOT
ROOT.gInterpreter.Declare('''
double hello = 3.;
''')
# Let PyRDF know that the function needs to be distributed and run on the workers.
PyRDF.initialize(declare_var)
# Run your analysis
print(r.Define("plop", "hello").Sum("plop").GetValue())
# Result: 30.0
I guess yours is a very simple example, but you could also do the same as follows:
import PyRDF
PyRDF.use('spark')
r = PyRDF.RDataFrame(10)
# This function will be run on every remote worker
def declare_var():
# Since the function itself will be pickle, we need to do a local import
# otherwise pickle complains about it.
import ROOT
ROOT.hello = 3.;
''')
# Let PyRDF know that the function needs to be distributed and run on the workers.
PyRDF.initialize(declare_var)
# Run your analysis
print(r.Define("plop", "hello").Sum("plop").GetValue())
# Result: 30.0
I hope that works! Please let us know if that solves your issue.
PD: The Bleeding Edge stack in Swan contains the latest version of PyRDF (0.2.0), that's the setup I used to run the code above.
Dear Javier,
It works like a charm ! Thanks for that trick and for the wisdom of having thought of that case.
Bests, Brian
Dear Javier, I reopen this issue because it does not seem to be solved. Indeed, I tried the following which works
r = PyRDF.RDataFrame(10)
def declare_var(): import ROOT ROOT.gInterpreter.Declare(''' double mapToNorm [5][12][2] = {{{0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.}} , {{0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {2.75224619, 0.} , {0., 0.} , {0., 0.} , {0., 0.}} , {{0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.}} , {{0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.}} , {{0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.} , {0., 0.}}}; ''') PyRDF.initialize(declare_var) r = r.Define("weight_mc",'3.').Define("mc_type","1").Define("beam_charge","-1.").Define("period","8").Define('_weight2', 'mapToNorm[(int)mc_type][(int)period][std::max((int)beam_charge,0)]*2.')
print(r.Sum("_weight2").GetValue()) # print(r.AsNumpy(["weight_mc"])["weight_mc"])
However if you uncomment the very last line using AsNumpy, you can see that it fails with the following error
AttributeError: Can't pickle local object '_RDataFrameAsNumpy.
.ndarray'
What do you think ? (Sorry for the non trivial array)
Bests, Brian NB: I use LCG 96 python3
Hi Brian, This could be a bug in PyRDF, but just to be sure, can you try a couple of things?
ROOT.RDataFrame
and instead of running PyRDF.initialize
you just call declare_var()
. Does that work?On the other hand, @JavierCVilla recently left CERN so I am taking over for the moment. @vepadulano is also gone but he will hopefully come back at the beginning of next year. This means it will not be until then that we can give another bump to the development of PyRDF, since the effort I can myself dedicate to it at the moment is very limited. Thank you for your understanding and for being such a faithful user!
Hi @Zeguivert !,
As @etejedor said, I left CERN so I might not be as responsive as before and most likely I will start phasing out my support on PyRDF as soon as someone else takes over the development.
Regarding the issue, it looks like an issue with the version you are using rather than an issue in PyRDF itself. The LCG 96 release (either python 2 or 3) contains PyRDF-0.1.0 where numpy
arrays are not supported, that's why you get the error:
AttributeError: Can't pickle local object '_RDataFrameAsNumpy..ndarray'
I would suggest you to try with Bleeding Edge where the PyRDF version is 0.2.0 and it supports numpy
arrays.
PD: To check what PyRDF version is included in a given LCG Release you can also check this webpage: lcginfo.cern.ch.
Let us know if it works.
Dear Enric, Dear Javier,
First of all thanks a lot for all those answers. It turned out that the solution was the one Javier suggested. Actually I loaded my own PyRDF version with sys.path.append and all, in order to get those AsNumpy() methods available even in LCG96, to bypass the existing old PyRDF version in it. However I remember that AsNumpy() required some modifications in PyROOT itself, so that LCG96 was not ready for this, but bleeding edge is.
So I solved my issue using Bleeding edge. I should say I do not really like it because of its instabilities, but fair enough...
Moreover I am selfishly sad to hear such news from you Javier, and I would have like to meet you once ! May the force be with you then ! And thanks a lot Enric to take this in charge until next year, this is not that far from now and I think/hope I will not need any bump in PyRDF dev until then...
I just want to add that thanks to some recent talks I did in my collaboration on RDataFrames and PyRDF, some people already using Jupyter seem really eager to use SWAN and PyRDF with it (others still prefer compiled tools to solve the speed issue). Then I guess PyRDF might have a bright future, at least to my mind. Again, thanks a lot !
Best regards, and my best wishes to you, Javier ! Brian
Dear developpers,
I encounter a recent issue using this module, which is about accessing external variables in a dataframe defined in a python code (I use PyROOT in the SWAN environment).
The issue comes from the fact that I have a map or a function in python which should take a column from the dataframe and define another column accordingly. A python lambda function should make the job but python lambdas are not yet supported in PyROOT I think. Then I tried to define this map or function using the ROOT.gInterpreter.Declare() function, which gives me access to my map from the dataframe.Define() method.
However this works for ROOT.RDataFrames, but not for PyRDF.RDataFrames, which seems logic but not a good news to me. Then my question is the following, may you help me in that direction ? Either make the ROOT environment/definitions accessible from the jobs, or even better, boost the implementation for using python lambdas in dataframes ?
Bests, Brian Ventura