Open Thrameos opened 3 years ago
I did rerun the smaller size benchmarks from the post with updated versions of all artifacts in the blog post and the brief timings are as follows.
Old values:
%timeit select_jaydebeapi(query.format(nrows=10000))
7.11 s ± 58.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit select_pyarrow_jvm(query.format(nrows=10000))
165 ms ± 5.86 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit select_jaydebeapi(query.format(nrows=100000))
1min 9s ± 1.07 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit select_pyarrow_jvm(query.format(nrows=100000))
538 ms ± 29.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit select_jaydebeapi(query.format(nrows=1000000))
11min 31s ± 4.76 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit select_pyarrow_jvm(query.format(nrows=1000000))
5.05 s ± 596 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
New values
%timeit select_jaydebeapi(query.format(nrows=10000))
2.6 s ± 312 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit select_pyarrow_jvm(query.format(nrows=10000))
227 ms ± 37.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit select_jaydebeapi(query.format(nrows=100000))
20.8 s ± 212 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit select_pyarrow_jvm(query.format(nrows=100000))
633 ms ± 140 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit select_jaydebeapi(query.format(nrows=1000000))
3min 26s ± 5.47 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit select_pyarrow_jvm(query.format(nrows=1000000))
4.24 s ± 153 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
I will write a 2020 version of the blog post with all the new caveats I run into but the main difference is updated performance values. Also I'm happy to provide profiling information or similar to show where the performance bottlenecks are.
Also I have a feeling that the Arrow version has a bug, so please only compare the jaydebeapi
timings.
Also I have a feeling that the Arrow version has a bug, so please only compare the
jaydebeapi
timings.
Found it and updated the above comment.
Sounds like a fairly good gain in performance thus far. We should at some time try to direct transfer method in JPype (as opposed to the JayDeBeAPI) which may make additional speed differences. I am not associated with JayDeBeAPI but I did write the jpype.dbapi2
package. Both are Python packages meaning they make individual call to fetch are native Python code (each of which hits a transaction cost), though I am curious the amount of overhead that Python it giving as they have different implementations.
So the main task that you are doing is trying to perform a fetchall to a Pandas framework right? Is it acceptable to spool up the entire transaction on the Java side and then do a batch transfer using JNI. Py4J which Arrow uses socket communication rather than direct memory transfer (as far as I am aware). The bandwidth limits on JNI are lower than socket based so the peak transfer rate that you can do through JPype should be higher than Arrow assuming there was a method on the Java side to handle the pull and data reorganization, but of course we are currently pulling data row wise an unmarshalling by field which is a much slower path. But if we spool up the data and pull columns en masse to the DataFrame then those bottleneck would be removed and you would be at the marshalling limits to the pandas transfer.
I did this same exercise with arrays a while back when I gave direct transfer from Java to numpy which gave a speed up of about a factor of 100.
So the main task that you are doing is trying to perform a fetchall to a Pandas framework right? Is it acceptable to spool up the entire transaction on the Java side and then do a batch transfer using JNI.
I'm not transferring my actual payload data. The magic of Arrow in this case is that the Java side already stores the payload data in off-heap memory in a layout that is 100% compatible with what the C++/Python side expects. So there is no transfer of these bytes through any interface, see https://github.com/apache/arrow/blob/db94f24f15ad783efe676fef8883b7cab9a82f88/python/pyarrow/jvm.py#L66-L69 As we hand the memory address from Java to Python and then to C++, we never actually create any Python objects for the payload. This is where the massive savings come from.
Py4J which Arrow uses socket communication rather than direct memory transfer (as far as I am aware).
This is how (Py)Spark is doing it. The Arrow project itself doesn't use Py4J anywhere in its codebase.
Also, I wasn't aware of jpype.dbapi2
, I'll include that in my benchmarks.
Okay I don't have much visibility into arrow or pyspark. It sounds like arrow is using a shared memory approach which should be about the same as the peak performance we can expect through the JNI methods. Though perhaps we can try to improve the transfer to be more similar. I can certainly add transfer methods like array and buffer in which a support class in the org.jpype
internal jar provides hooks for the Python module to call. That is how things like memoryview(JArray)
do magic behind the scenes by spooling and marshalling on Java and then initiating a bulk transfer using the Python buffer protocol. This can of course skip all the Python object creation. I have been trying to make things "Python native" by adding support routines like these that hide behind the scenes so the user never needs to invoke a specific library to move bytes.
(The unfortunate side effect being that nobody sees anything changing in JPype API so it isn't really clear that there was some huge speed up unless you read deep in the implementation notes. I do try show it in the CHANGELOG but it easily gets overlooked.)
JayDeBeAPI author was AWOL for a period of about a year (likely busy with other things and didn't address issues or PR requests). Unfortunately as he wasn't addressing changes between 0.6.x and 0.7 and later, his users "accepted solution" was to pin JPype to 0.6.3 which was horribly buggy and resulting in endless bug reports on ancient versions. As the license for JaYDeBeAPI was incompatible with the JPype project I just rewrote the whole thing from scratch. Not that I have anything against JayDeBeAPI in general, but I really couldn't have pin to a broken version as acceptable.
Needed to comment out self._jcx.setAutoCommit(False)
as Apache Drill errors with "not supported" on this but the timings are roughly 2x better than jaydebeapi
:
%timeit select_jpype_dbapi2(query.format(nrows=10000))
1.23 s ± 44.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit select_jpype_dbapi2(query.format(nrows=100000))
9.16 s ± 372 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit select_jpype_dbapi2(query.format(nrows=1000000))
1min 28s ± 2.84 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
For my Arrow/pandas use-case, there is nothing that needs to be improved on the jpype
side from my perspective. I'll try to write a more detailed explanantion of what(/why) is happening there (and why I think this is the best for that) tomorrow.
Okay guess I should add a test for jpype.dbapi2 for drill to catch the problem. Must be something in the DB API that tells me if autocommit is available to avoid that error. Any suggestions on making it compatible or other potential improvements in jpype.dbapi2 would be appreciated. I am not much of a database user (though I do a lot of ML and some big data work), so I just took my best guess on a usable API.
Another factor of 2 off of speed it pretty impressive given we are still doing a full Python object row-wise transfer. If you can point me to the entry point that is being used for pandas transfers we can try the spool and direct transfer as a prototype for future inclusion in JPype. I hope it is something that can be reached without creating dependencies. NumPy was an absolute nightmare as they built their own method dispatch system, you couldn't call it dynamically without making dependencies. I broke that by using the Python buffer API. Hopefully pandas has a similar API that I can link to without requiring pandas to be available.
I believe I could give an optimized transfer path using the following syntax if it is acceptable.
columns = connection.fetchMany(1000, asColumns=True) # Spool up a large query on the JPype side to a set of arrays.
df = pandas.DataFrame(numpy.column_stack(columns)) # Transfer the columns to pandas (without going through Python objects. There may be a better way but it is the only one I found so far.
It would require one support method in org.jpype
to serve as the spooling to pull a block.
The reason it would have to come across as columns is so that we have homogeneous memory types. It is much easier and faster to pull [ java.lang.String[], JInt[]. JDouble[], JInt[], JInt[]]
than it is to pull [ [java.lang.String, java.lang.Integer, java.lang.Double, java.lang.Integer, java.lang.Integer],[java.lang.String, java.lang.Integer, java.lang.Double, java.lang.Integer, java.lang.Integer], ...]
The downside is that data entries would still be Java objects rather than Python natives.
I did a few tests to see what the speed limit (not practical but purely bandwidth limited) between JPype from a database to pandas dataframe using array transfer protocol. This won't work for nullable columns are would need a lot of rework to be practical, but should serve as a benchmark for what is the limits.
First we need a method to convert a ResultSet into a columnwise form...
public class Bulk
{
public Object fetchBulk(java.sql.ResultSet rs, int count) throws java.sql.SQLException
{
java.sql.ResultSetMetaData meta = rs.getMetaData();
int n = meta.getColumnCount();
double[][] columns = new double[n][];
for (int i=0; i<columns.length; ++i)
{
columns[i] = new double[count];
}
int j = 0;
while (j<count & rs.next() )
{
for (int k=0; k<n; ++k)
columns[k][j] = rs.getDouble(k+1);
j+=1;
}
if (j < count)
{
for (int k=0; k<n; ++k)
{
columns[k] = java.util.Arrays.copyOfRange(columns[k], 0, j);
}
}
return columns;
}
}
Again this isn't really practical speed test because we need to get the column types in advance an build a fetch for rows which would slow down Java queries, but this seems like a reasonable limiting case.
Next we need a test bench to measure transfer speed.
import jpype
import jpype.imports
import jpype.dbapi2 as dbapi2
import timeit
import numpy as np
import pandas
jpype.startJVM(classpath=['.', "lib/h2-1.4.200.jar"])
# Push some data into a table
def generator():
for i in range(50000):
yield [float(np.random.uniform(0,1)), float(np.random.uniform(0,1))]
print("Populate table")
cx = dbapi2.connect("jdbc:h2:mem:testdb")
with cx.cursor() as cur1:
cur1.execute("create table booze (x REAL, y REAL)")
cur1.executemany("insert into booze values (?,?)", generator())
# Pull and go to pandas
def pull1():
with cx.cursor() as cur1:
cur1.execute("select * from booze")
booze = cur1.fetchall()
# Just pull
def pull2():
with cx.cursor() as cur1:
cur1.execute("select * from booze")
booze = pandas.DataFrame(cur1.fetchall())
bulk = jpype.JClass('Bulk')()
def pull3():
with cx.cursor() as cur1:
cur1.execute("select * from booze")
columns = bulk.fetchBulk(cur1._resultSet, 50000)
def pull4():
with cx.cursor() as cur1:
cur1.execute("select * from booze")
columns = bulk.fetchBulk(cur1._resultSet, 50000)
# Hot spot makes benchmarking tough, so we often need to prime before testing so that
# we don't get fooled by JIT compiler
print("Priming")
timeit.timeit('pull1()', number=10, globals=globals())
print("Testing")
print("Fetch rowwise:", timeit.timeit('pull1()', number=10, globals=globals())/10)
print("Fetch to pandas rowwise:", timeit.timeit('pull2()', number=10, globals=globals())/10)
# Repeat with a long run to improve accuracy
print("Fetch columnwise:",timeit.timeit('pull3()', number=1000, globals=globals())/1000)
print("Fetch to pandas columnwise:", timeit.timeit('pull4()', number=1000, globals=globals())/1000)
Running this on my slow laptop gives...
Populate table
Priming
Testing
Fetch rowwise: 0.7174200299999939
Fetch to pandas rowwise: 0.8330038899999636
Fetch columnwise: 0.0031541166999995767
Fetch to pandas columnwise: 0.0037342227999979513
Assuming this ratio holds we should be able to chop that 1min + 28s down to about 0.4s which should be on the order of the arrow benchmark. Roughly on the order of 1700 times faster than the original benchmark.
Not sure if this helps, but it does tell about the margins we have to work with in terms of fast JDBC access.
Oh one other very minor issue with the blog. It is usually better to use module imports rather than JPackage. JPackage originates from the start of the JPype project and each level of the accessor will trigger another class look up. Prior to my last round of caching that meant that if you are navigating through 5 levels of tld it would be 5 times as slow as looking up by name. Fortunately, I did some work to unify JPackage with imports so it is now using similar caching so it now shares a pretty fast path. Even then there are faster methods.
import jpype
import jpype.imports
jpype.startJVM()
import java
from java.lang import String
import timeit
def s1():
return jpype.JClass("java.lang.String")
def s2():
return jpype.JPackage("java").lang.String
def s3():
return java.lang.String
def s4():
return String
print("Import once using 'from x import y'", timeit.timeit("s4()", number=100000, globals=globals()))
print("Use an import from a tld ", timeit.timeit("s3()", number=100000, globals=globals()))
print("Use JPackage ", timeit.timeit("s2()", number=100000, globals=globals()))
print("Use JClass ", timeit.timeit("s1()", number=100000, globals=globals()))
Gives...
Import once using 'from x import y' 0.008551500002795365
Use an import from a tld 0.026284299994586036
Use JPackage 0.04254470000159927
Use JClass 0.38298289999511326
Obviously if you are only calling things once it really doesn't matter, but people often use patterns so every so often someone ends up writing [ JClass("com.foo.Convert").convert(i) for i in very_long_list]
which is absolutely horrible. Unfortunately JPype has a huge number of "bad" examples that I will never fully track down.
NumPy was an absolute nightmare as they built their own method dispatch system, you couldn't call it dynamically without making dependencies. I broke that by using the Python buffer API. Hopefully pandas has a similar API that I can link to without requiring pandas to be available.
pandas
is worse than NumPy, there is no real API. That's also a reason to use pyarrow
to construct things on the Python side as it has a nicer API (basically similar to NumPy) but a really fast conversion routine to pandas
, faster than actually constructing a pandas.DataFrame
from a dict of NumPy arrays.
If you can point me to the entry point that is being used for pandas transfers we can try the spool and direct transfer as a prototype for future inclusion in JPype.
Not sure I pointed that out correctly but we actually don't copy any memory, we create the memory buffers used for pyarrow
/ pandas
already in Java using Netty and just hand over the memory address.
I'll probably don't have much time today but I'll apply the above suggestions in my code and report back the next days.
Yeah trying to send in dict to pandas would certainly be huge time suck. I had better luck with the column_stack to numpy and then passing the pandas. There may be other ways to construct it without calling column stack but I wasn't finding many column oriented APIs for heterogenous memory structures available.
With regard to copying. There always has to be copy someplace when working with Java arrays. Java does not permit pinned arrays so that means something has to move the memory from Java space into C memory space. JPype does this in few ways. It either exposes the Java array for a short period of time (using the limited critical section) which can be copied once to get to C space, it uses Get/Release pattern which will copy the memory twice (once for Java to copy the array to the JNI marshalling space and then again to copy the JNI transfer space to C space), or as a view (Python can access elementwise but each access produces a new Python object). If Java supported pinning (or as in the case of java.nio.Buffer where unsafe memory can be allocated) the we can directly give memory to numpy to use.
I can make faster transfers from Java to Python numpy if we start the transaction the other direction. That is if we first create the memory space in C for the transfer in numpy, then wrap these as direct java.nio.Buffer types and pass them to Java, then Java can write directly into unsafe space. This eliminates the transfers entirely, but it does incur the overhead of using the buffer API. I haven't done much speed testing for this concept but I suppose it would be possible to try.
I am also working on a reverse bridge which will allow Java to directly call CPython so we can skip the middle man when doing things like transferring a lot of String objects.
If you can define what you are looking for in terms of an API, then I can certainly make an attempt at producing an optimized path. JPype is primarily focused on scientific and engineering coding (especially since I am employed as a signal processing engineer for a nuclear data and radiation processing), so I am always looking for ways to improve transfer speed and productivity.
I evaluated the cost of using Java arrays which need to be copied verses direct byte buffer transfer in which the memory was created in Python side and then filled out directly by Java as nio direct buffers. The key issue here is that Java directly writing the buffers requires a lot of virtual calls relative to just writing the array and copying to the Python space once using the high speed buffer protocol. Because the call path to copy directly to Java is rather slow, copying the memory was more effective.
Fetch to pandas rowwise: 1.0152149100002135
Fetch to pandas columnwise array: 0.004775572299986379
Fetch to pandas columnwise buffer: 0.005558661199989728
Fetch nothing: 0.003531180299993139
Arrays is about a 35% overhead relative to just retrieving the data from SQL compared to 55% overhead write directly with buffers. Given that it is only 35% overhead there really isn't much room there for improvement. I could see it being slightly faster if you go with the dedicated JNI call, but I suspect the jvalue overhead on the JNI call will eat any potential gain.
Bottom line is that I can do fairly low overhead transfers though it will take a bit of work to achieve in general especially if you want to pull columns with strings, nulls, or other object types. Pure primitive transfer is pretty quick.
Still not sure how this compares to using arrow though.
Edit:
I updated to use a asDoubleBuffer which helped a small amount. (Also machine was less loaded so the speeds improved).
After switching to a DirectDoubleBuffer the overhead dropped as it saves two virtual calls. Still array and direct memory transfers are nearly equivalent.
Fetch to pandas rowwise: 0.34797074999951294
Fetch to pandas columnwise: 0.0019635969999944793
Fetch to pandas buffer: 0.0018464208999939729
Fetch nothing: 0.0013564935000031256
I looked deeper at the issue of numpy and structures. It appears that column_stack only works on homogeneous data types so clearly not useful in except in limited cases. However, structure arrays created by numpy.zeros(length, dtype)
have memoryviews that can be directly written it without any issue using Java buffer operations. Other than the referencing cost for strings and other similar objects all primitives can be written directly without much overhead.
So I think that I can set up a demo with a fast transfer on fetch all. It will however has a numpy requirement on the API as there is nothing in Python which represents a list of memory columns that I am aware of.
Thanks for the hint with import jpype.imports
. That actually speeds up my Arrow by a bit as the JPackage
usage was time consuming.
Back to the copying: No, there is no copy happening between Java and Python/C++. The following things happen instead:
VectorSchemaRoot
is filled by the CompositeJdbcConsumer which delegates the ResultSet
-> Arrow conversion to individual per-column consumers like the NullableBitConsumerAs i said if you would like an opimized path for fetchMany I think i can achieve something about as fast as arrow (possibly faster depending on whether I can get a dirct struct write.) Or if you find something slow in JPype that needs a bypass just put in an issue.
I made an updated blog post here: https://github.com/xhochy/xhochy.github.com/pull/7
For now, everything is fine for me. The jpype.dbapi2
interface is a nice addition for small use cases but I'm really happy with the performance of pyarrow
and as a maintainer of pyarrow
itself, I also prefer to use it. There is currently no interface to insert data into a JDBC sink via pyarrow.jvm
but I'll add that in future, too.
Thanks for the heads up. I will leave adding a block transfer API for later when I have more free time. I think that rather than coding an API specifically for jdbc instead I should create block transfers of structured data. Just like bulk primitive transfer, this accelerated API would make structure data easier to move. But that will depend on the completion of the reverse bridge when I can call Python from within Java directly. I can then use it for bulk inserts and selects.
If you do find a bottle neck for arrow in JPype just put an issue ticket in. I am working a number of enhancements and am always looking for additional areas of improvement. The most likely improvement that will help you would be the ability to preselect methods from a dispatch. Currently JPype resolves the method overload each time it encounters a method call. This has caching to accelerate repeated calls to the same overload, but if Java has many overloads with the same name and the type of the arguments being passed change between uses then the method resolution process has to be done each time. I am planning to refactor it so that one can use reflection to get a method overload which accepts only the specified arguments which will allow speed sensitive applications bypass the resolution process.
@xhochy With regard to https://uwekorn.com/2019/11/17/fast-jdbc-access-in-python-using-pyarrow-jvm.html, have you made any attempts to measure the speed after the JPype conversion to a CPython-API?
The blog you posted was dated November 2019 and since then we have had a factor of 3 speed up in return path, a factor a 4 on the call dispatch, and another factor of 20 to 400 on array transfers depending on the type. I am sure that marshalling arguments over one at a time is still much slower than a dedicated batch transfer but I would hope that we are substantially faster than 2 hours at this point.