Open ajpotts opened 7 months ago
@ajpotts: Here are some questions designed to understand the request here better:
var myString = readStringFromConsole();
apply(arry1, myString);
Realistically this probably cannot be done entirely within chapel. I'm wondering if there is streaming process that can integrate with chapel. For example, in Spark, an RDD can be piped through a bash script, which can call any program that is installed on all the nodes (such as a compiled C++ program).
Thanks for thinking about this.
@ajpotts : Given the answers to 1, 3, and 4, I agree with your answer to 2. :)
With respect to this:
since chapel is strongly typed and python is not.
my thought would be to require the apply()
routine to take a type argument that would say what the result type of the lambda would be. For example:
apply(arry1, "lambda x: x+1");
Since Arkouda already requires ZMQ, would it make sense for each Arkouda server process to spawn off a Python interpreter, set up a ZMQ channel (or a pair of them?) to communicate with it, using it/them to send the function string, the number of array elements, and then the array elements themselves, receiving the results back?
This is an interesting question and reminds me of a few other efforts that have gone on in Arkouda over the years.
The first one this calls to mind for me is what we called a "lisp interpreter" which allowed a user to write Python code, then Python would turn that Python code into a lisp expression, which would then be sent to the Arkouda server to be parsed and executed on the server.
In practice, this meant users could write Python functions with the @arkouda_func
decorator and have that be executed as a single server command, here are a few of the test cases that we got working:
@arkouda_func
def my_axpy(a : ak.float64, x : ak.pdarray, y : ak.pdarray) -> ak.pdarray:
return a * x + y
@arkouda_func
def my_filter(v : ak.int64, x : ak.pdarray, y : ak.pdarray) -> ak.pdarray:
return ((y+1) if (not (x < v)) else (y-1))
This to me sounds like a very similar result as what you are hoping for, but it didn't work on any arbitrary Python code and the server would need to be updated for all supported operations. Depending on the scope of what is required here, this may achieve what you are hoping for (where it at least works for the example of x+1
from the example above).
The second effort that comes to mind is the array transfer work, which does something similar to what Brad has mentioned above, but the ZMQ sockets were communicating between Chapel servers, not Chapel servers and Python instances, though, I think the idea would remain the same and much of that code could be reused for an approach like Brad mentioned.
This code is available in the TransferMsg
module in Arkouda, but I'm happy to answer questions about that if it seems like an approach you'd like to look into for Arkouda.
Working with the C Python API, I can get this working with COMM=none and serial code today. Here is a small snippet (in total there is about 250 lines of Chapel code to make this work)
proc apply(type t, arr, l) {
var lambdaFunc = compileLambda(l);
var res: [arr.domain] t;
for i in arr.domain {
res(i) = lambdaFunc(t, arr(i));
}
return res;
}
var data: [1..10] int = 1..10;
writeln(" data: ", data);
var res = apply(int, data, "lambda *args: args[0] + 1 if args[0] % 2 != 0 else args[0]");
writeln(" res: ", res);
var res2 = apply(string, data, "lambda *args: ':' + str(args[0]) + ':'");
writeln(" res: ", res2);
There are some technical limitations.
*args
I've cleaned up some of my code, put everything in a module, and resolved some of the limitations
import Python;
config const n = 100;
config const func = 'lambda x,: x * x';
proc main() {
use BlockDist;
var a = blockDist.createArray({1..#n}, int);
a = 1..#n;
coforall l in a.targetLocales() {
on l {
// each locale has its own interpreter
const interp = new Python.Interpreter();
const lambdaFunc = new Python.Function(interp, func);
const d = a.localSubdomain();
var threadState = Python.CPythonInterface.PyEval_SaveThread();
forall x in a[d] with(var gil = new Python.GIL()) {
x = lambdaFunc(x.type, x);
}
Python.CPythonInterface.PyEval_RestoreThread(threadState);
}
}
writeln(a);
}
This snippet works for distributed and parallel computation.
gil
. This requirement might be able to removed if the GIL is disabled (optional python3.13 feature), but I have not had much luck yetAlso note that in this snippet *args
has been replaced with x,
, which is still a tuple but it looks nicer. It may be possible to remove this limitation but it doesn't feel that imperative to me
@jabraham17 : Thanks for looking into this and your prototype! A couple of questions:
By creating a "one per locale" distributed array of GILs, interpreters, etc. could this coforall + on + forall be rewritten as a forall over a distributed array?
Possibly? Part of the issue here is that in order to have any kind of threading PyEval_SaveThread
must be called on the locale, and then PyEval_RestoreThread
afterwards, this is separate from the GIL.
This is based on my admittedly limited understanding of threading in Python using the C interface based on the docs
Is your code available on a branch or PR somewhere?
Not yet, but it will be later today
Part of the issue here is that in order to have any kind of threading PyEval_SaveThread must be called on the locale, and then PyEval_RestoreThread afterwards, this is separate from the GIL.
Ah, just once per locale, not once per thread? I can see how that would be a challenge with task intents as they stand… Would calling it once per thread rather than process lead to correctness issues, or just extra overhead? I suppose we could create a "one per locale" record or class that monitored whether anyone had done this yet, using an atomic as a lock (?).
Its not just "once per locale", its "once per locale per gil acquisition".
So ideally you could call PyEval_SaveThread
in the initialize for the interpreter, but then if you don't also acquire the GIL the code either segfaults or deadlocks
There is a lot of room here for tweaking to this to get the behavior that we want. With the code I added here, it is no faster than serial code because we still have to acquire the GIL so it essentially becomes serial. This problem goes away if you have python 3.13+ which was built with the option of a GIL-less interpreter and you run with PYTHON_GIL=0
Something mentioned today by @mppf which may be a possibility is to have an interpreter per thread. This is totally possible with the sub interpreter API, but this has some extra fiddliness with how it interacts with the GIL. But it could be a way to get better multi-threaded performance regardless of the GIL
The base code is now available in https://github.com/chapel-lang/chapel/pull/26156. I need to do more work to this PR to get it in mergeable shape (namely docs and testing), but the bulk of the code is there
Summary of Feature
Please add an apply method that can transform a distributed array by a python lambda function.
Description:
Is this a blocking issue with no known work-arounds? Yes. Arkouda has been asked to develop a Series.apply method which would require this functionality. I realize this will be difficult (and potentially not feasible / will have to be limited in scope) since chapel is strongly typed and python is not.
Code Sample
Should return
makeDistDom is a helper function in akrouda which returns a block distributed domain.