sfu-db / connector-x

Fastest library to load data from DB to DataFrames in Rust and Python
https://sfu-db.github.io/connector-x
MIT License
2.02k stars 163 forks source link

support python 3.12 #615

Closed wangxiaoying closed 6 months ago

wangxiaoying commented 7 months ago

527

wangxiaoying commented 7 months ago

Still has concurrency issues, might related to https://github.com/PyO3/pyo3/issues/4105

wangxiaoying commented 7 months ago

Backtrace of just aaa-d

Process 26129 launched: '/Users/momo/prog/connector-x/connectorx-python/target/debug/examples/test_aaa' (arm64)
[2024-04-22T18:32:32Z DEBUG connectorx_python::pandas] Protocol: binary
[2024-04-22T18:32:32Z DEBUG connectorx::dispatcher] Run dispatcher
[2024-04-22T18:32:32Z DEBUG connectorx::dispatcher] Prepare
[2024-04-22T18:32:32Z DEBUG connectorx::dispatcher] Fetching metadata
[2024-04-22T18:32:32Z DEBUG connectorx::dispatcher] Try get row rounts for entire result
[2024-04-22T18:32:32Z DEBUG connectorx::dispatcher] Manually count rows of each partitioned query and sum up
[2024-04-22T18:32:32Z DEBUG connectorx::sql] Transformed count query: SELECT count(*) FROM (SELECT test_int, test_str FROM test_table WHERE test_int = 0) AS CXTMPTAB_COUNT
[2024-04-22T18:32:32Z DEBUG connectorx::sql] Transformed count query: SELECT count(*) FROM (SELECT test_int, test_str FROM test_table WHERE test_int = 1) AS CXTMPTAB_COUNT
[2024-04-22T18:32:32Z DEBUG connectorx::dispatcher] Allocate destination memory: 2x2
[2024-04-22T18:32:33Z DEBUG connectorx::dispatcher] Create destination partition
[2024-04-22T18:32:33Z DEBUG connectorx::dispatcher] Start writing
[2024-04-22T18:32:33Z DEBUG connectorx::dispatcher] Finalize partition 1
[2024-04-22T18:32:33Z DEBUG connectorx::dispatcher] Finalize partition 0
Process 26129 stopped
* thread #7, stop reason = EXC_BAD_ACCESS (code=1, address=0x10)
    frame #0: 0x0000000106d5e0d4 libpython3.12.dylib`_PyObject_Malloc + 40
libpython3.12.dylib`:
->  0x106d5e0d4 <+40>: ldr    x20, [x8, #0x10]
    0x106d5e0d8 <+44>: adrp   x21, 998
    0x106d5e0dc <+48>: add    x21, x21, #0x450          ; _PyRuntime
    0x106d5e0e0 <+52>: ldr    x8, [x21, #0x30]
Target 0: (test_aaa) stopped.
(lldb) bt
* thread #7, stop reason = EXC_BAD_ACCESS (code=1, address=0x10)
  * frame #0: 0x0000000106d5e0d4 libpython3.12.dylib`_PyObject_Malloc + 40
    frame #1: 0x0000000106d8bde8 libpython3.12.dylib`PyUnicode_New + 180
    frame #2: 0x000000010011d4cc test_aaa`connectorx_python::pandas::pystring::StringInfo::pystring::hd9da27422fd72f8e(self=0x0000000170a3d088, (null)=Python @ 0x0000000170a3ce07) at pystring.rs:62:21
    frame #3: 0x000000010006e2c4 test_aaa`connectorx_python::pandas::pandas_columns::string::StringColumn::flush::h484e25ccc8c4d8d5(self=0x00006000026002a0, force=true) at string.rs:278:55
    frame #4: 0x000000010006cd50 test_aaa`_$LT$connectorx_python..pandas..pandas_columns..string..StringColumn$u20$as$u20$connectorx_python..pandas..pandas_columns..PandasColumnObject$GT$::finalize::h9ded97de65162e5d(self=0x00006000026002a0) at string.rs:77:9
    frame #5: 0x000000010016c3bc test_aaa`_$LT$connectorx_python..pandas..destination..PandasPartitionDestination$u20$as$u20$connectorx..destinations..DestinationPartition$GT$::finalize::hbc50e938cc80cced(self=0x0000000170a3db90) at destination.rs:374:13
    frame #6: 0x00000001001c4568 test_aaa`connectorx::dispatcher::Dispatcher$LT$S$C$D$C$TP$GT$::run::_$u7b$$u7b$closure$u7d$$u7d$::h92302c3ed1765030((null)=(usize, (connectorx_python::pandas::destination::PandasPartitionDestination, connectorx::sources::postgres::PostgresSourcePartition<connectorx::sources::postgres::BinaryProtocol, postgres_openssl::MakeTlsConnector>)) @ 0x0000000170a3f180) at dispatcher.rs:191:17
    frame #7: 0x00000001001dcc20 test_aaa`core::ops::function::impls::_$LT$impl$u20$core..ops..function..FnMut$LT$A$GT$$u20$for$u20$$RF$F$GT$::call_mut::hb3c093f30e5c2873(self=0x0000000170a3ff40, args=((usize, (connectorx_python::pandas::destination::PandasPartitionDestination, connectorx::sources::postgres::PostgresSourcePartition<connectorx::sources::postgres::BinaryProtocol, postgres_openssl::MakeTlsConnector>))) @ 0x0000000170a3f180) at function.rs:272:13
    frame #8: 0x00000001001dce94 test_aaa`core::ops::function::impls::_$LT$impl$u20$core..ops..function..FnOnce$LT$A$GT$$u20$for$u20$$RF$mut$u20$F$GT$::call_once::h1035ff33ba66cffc(self=0x0000000170a3ff40, args=((usize, (connectorx_python::pandas::destination::PandasPartitionDestination, connectorx::sources::postgres::PostgresSourcePartition<connectorx::sources::postgres::BinaryProtocol, postgres_openssl::MakeTlsConnector>))) @ 0x0000000170a3f400) at function.rs:305:13
    frame #9: 0x00000001000cdb68 test_aaa`_$LT$core..iter..adapters..map..Map$LT$I$C$F$GT$$u20$as$u20$core..iter..traits..iterator..Iterator$GT$::next::hfece4d22faebc817 at option.rs:1072:29
    frame #10: 0x00000001000cdb64 test_aaa`_$LT$core..iter..adapters..map..Map$LT$I$C$F$GT$$u20$as$u20$core..iter..traits..iterator..Iterator$GT$::next::hfece4d22faebc817(self=0x0000000170a3fee0) at map.rs:108:26
    frame #11: 0x00000001001240f4 test_aaa`rayon::iter::plumbing::Folder::consume_iter::h7fde375b754338ab(self=TryReduceFolder<fn((), ()) -> core::result::Result<(), connectorx_python::errors::ConnectorXPythonError>, core::result::Result<(), connectorx_python::errors::ConnectorXPythonError>> @ 0x0000000170a40458, iter=<unavailable>) at mod.rs:177:21
    frame #12: 0x00000001001b7b88 test_aaa`_$LT$rayon..iter..map..MapFolder$LT$C$C$F$GT$$u20$as$u20$rayon..iter..plumbing..Folder$LT$T$GT$$GT$::consume_iter::hf29ab90e61324570(self=MapFolder<rayon::iter::try_reduce::TryReduceFolder<fn((), ()) -> core::result::Result<(), connectorx_python::errors::ConnectorXPythonError>, core::result::Result<(), connectorx_python::errors::ConnectorXPythonError>>, connectorx::dispatcher::{impl#0}::run::{closure_env#0}<connectorx::sources::postgres::PostgresSource<connectorx::sources::postgres::BinaryProtocol, postgres_openssl::MakeTlsConnector>, connectorx_python::pandas::destination::PandasDestination, connectorx_python::pandas::transports::postgres::PostgresPandasTransport<connectorx::sources::postgres::BinaryProtocol, postgres_openssl::MakeTlsConnector>>> @ 0x0000000170a40668, iter=Zip<core::ops::range::Range<usize>, core::iter::adapters::zip::Zip<rayon::vec::SliceDrain<connectorx_python::pandas::destination::PandasPartitionDestination>, rayon::vec::SliceDrain<connectorx::sources::postgres::PostgresSourcePartition<connectorx::sources::postgres::BinaryProtocol, postgres_openssl::MakeTlsConnector>>>> @ 0x0000000170a40768) at map.rs:248:21
    frame #13: 0x000000010006a3e4 test_aaa`rayon::iter::plumbing::Producer::fold_with::hb600ee2ab34bf77a(self=EnumerateProducer<rayon::iter::zip::ZipProducer<rayon::vec::DrainProducer<connectorx_python::pandas::destination::PandasPartitionDestination>, rayon::vec::DrainProducer<connectorx::sources::postgres::PostgresSourcePartition<connectorx::sources::postgres::BinaryProtocol, postgres_openssl::MakeTlsConnector>>>> @ 0x0000000170a412e8, folder=<unavailable>) at mod.rs:109:9
    frame #14: 0x000000010005b1ec test_aaa`rayon::iter::plumbing::bridge_producer_consumer::helper::hb4ab6c313999feb9(len=1, migrated=true, splitter=LengthSplitter @ 0x0000000170a408d0, producer=EnumerateProducer<rayon::iter::zip::ZipProducer<rayon::vec::DrainProducer<connectorx_python::pandas::destination::PandasPartitionDestination>, rayon::vec::DrainProducer<connectorx::sources::postgres::PostgresSourcePartition<connectorx::sources::postgres::BinaryProtocol, postgres_openssl::MakeTlsConnector>>>> @ 0x0000000170a414b8, consumer=MapConsumer<rayon::iter::try_reduce::TryReduceConsumer<fn((), ()) -> core::result::Result<(), connectorx_python::errors::ConnectorXPythonError>, fn()>, connectorx::dispatcher::{impl#0}::run::{closure_env#0}<connectorx::sources::postgres::PostgresSource<connectorx::sources::postgres::BinaryProtocol, postgres_openssl::MakeTlsConnector>, connectorx_python::pandas::destination::PandasDestination, connectorx_python::pandas::transports::postgres::PostgresPandasTransport<connectorx::sources::postgres::BinaryProtocol, postgres_openssl::MakeTlsConnector>>> @ 0x0000000170a414e0) at mod.rs:437:13
    frame #15: 0x0000000100062824 test_aaa`rayon::iter::plumbing::bridge_producer_consumer::helper::_$u7b$$u7b$closure$u7d$$u7d$::hd0d7c9df8cd4067c(context=FnContext @ 0x0000000170a414b7) at mod.rs:426:21
    frame #16: 0x0000000100208a34 test_aaa`rayon_core::join::join_context::call_b::_$u7b$$u7b$closure$u7d$$u7d$::h267cafdb6aeacdef(migrated=true) at mod.rs:129:25
    frame #17: 0x00000001000e3260 test_aaa`rayon_core::job::JobResult$LT$T$GT$::call::_$u7b$$u7b$closure$u7d$$u7d$::h7a5cd7a66a225993 at job.rs:218:41
    frame #18: 0x00000001000915f4 test_aaa`_$LT$core..panic..unwind_safe..AssertUnwindSafe$LT$F$GT$$u20$as$u20$core..ops..function..FnOnce$LT$$LP$$RP$$GT$$GT$::call_once::h832bd53effb00d90(self=<unavailable>, (null)=<unavailable>) at unwind_safe.rs:272:9
    frame #19: 0x000000010008b874 test_aaa`std::panicking::try::do_call::hd071d3ba0aacda74(data="\U00000018:\xdfo\U00000001") at panicking.rs:554:40
    frame #20: 0x0000000100090698 test_aaa`__rust_try + 32
    frame #21: 0x000000010008411c test_aaa`std::panicking::try::h671f142de9e1f1f1(f=<unavailable>) at panicking.rs:518:19
    frame #22: 0x000000010012a204 test_aaa`std::panic::catch_unwind::ha88b1fceba2ffaef(f=<unavailable>) at panic.rs:142:14
    frame #23: 0x00000001000417dc test_aaa`rayon_core::unwind::halt_unwinding::h1c401a0b523216d4(func=<unavailable>) at unwind.rs:17:5
    frame #24: 0x00000001000e1070 test_aaa`rayon_core::job::JobResult$LT$T$GT$::call::hbb067960e17b9bd4(func=<unavailable>) at job.rs:218:15
    frame #25: 0x00000001000fa564 test_aaa`_$LT$rayon_core..job..StackJob$LT$L$C$F$C$R$GT$$u20$as$u20$rayon_core..job..Job$GT$::execute::hfff7ab2fb1a16e4d(this=0x0000000170627de0) at job.rs:120:32
    frame #26: 0x0000000101d79f68 test_aaa`rayon_core::job::JobRef::execute::h0ade5023cc916a25(self=JobRef @ 0x0000000170a42280) at job.rs:64:9
    frame #27: 0x0000000101d76d9c test_aaa`rayon_core::registry::WorkerThread::execute::hdec86ef5278334de(self=0x0000000170a42580, job=JobRef @ 0x0000000170a422b0) at registry.rs:860:9
    frame #28: 0x0000000101ea1bd8 test_aaa`rayon_core::registry::WorkerThread::wait_until_cold::hcffe60d60917825c(self=0x0000000170a42580, latch=0x000000010671a060) at registry.rs:794:21
    frame #29: 0x0000000101d76b3c test_aaa`rayon_core::registry::WorkerThread::wait_until::h6651653c82223b01(self=0x0000000170a42580, latch=0x000000010671a060) at registry.rs:769:13
    frame #30: 0x0000000101d76bd8 test_aaa`rayon_core::registry::WorkerThread::wait_until_out_of_work::h0f200ab979cfde54(self=0x0000000170a42580) at registry.rs:818:9
    frame #31: 0x0000000101d77174 test_aaa`rayon_core::registry::main_loop::ha9ae4adbd1aeca77(thread=<unavailable>) at registry.rs:923:5
    frame #32: 0x0000000101d749b8 test_aaa`rayon_core::registry::ThreadBuilder::run::h6ef31d25d4d6cfdf(self=<unavailable>) at registry.rs:53:18
    frame #33: 0x0000000101d74d24 test_aaa`_$LT$rayon_core..registry..DefaultSpawn$u20$as$u20$rayon_core..registry..ThreadSpawn$GT$::spawn::_$u7b$$u7b$closure$u7d$$u7d$::h9cb4f23fae79d623 at registry.rs:98:20
    frame #34: 0x0000000101d89c7c test_aaa`std::sys_common::backtrace::__rust_begin_short_backtrace::hfd69269107c146a7(f=<unavailable>) at backtrace.rs:155:18
    frame #35: 0x0000000101d7fb14 test_aaa`std::thread::Builder::spawn_unchecked_::_$u7b$$u7b$closure$u7d$$u7d$::_$u7b$$u7b$closure$u7d$$u7d$::h3fd435a305f8dbc9 at mod.rs:529:17
    frame #36: 0x0000000101d7dc7c test_aaa`_$LT$core..panic..unwind_safe..AssertUnwindSafe$LT$F$GT$$u20$as$u20$core..ops..function..FnOnce$LT$$LP$$RP$$GT$$GT$::call_once::h2c9b5c65e976868c(self=<unavailable>, (null)=<unavailable>) at unwind_safe.rs:272:9
    frame #37: 0x0000000101d86da0 test_aaa`std::panicking::try::do_call::h5efc36c67a56ca7c(data="") at panicking.rs:554:40
    frame #38: 0x0000000101d8902c test_aaa`__rust_try + 32
    frame #39: 0x0000000101d86b9c test_aaa`std::panicking::try::h3b339c997fe35df5(f=<unavailable>) at panicking.rs:518:19
    frame #40: 0x0000000101d7f98c test_aaa`std::thread::Builder::spawn_unchecked_::_$u7b$$u7b$closure$u7d$$u7d$::h72cc928f9ab6250e at panic.rs:142:14
    frame #41: 0x0000000101d7f988 test_aaa`std::thread::Builder::spawn_unchecked_::_$u7b$$u7b$closure$u7d$$u7d$::h72cc928f9ab6250e at mod.rs:528:30
    frame #42: 0x0000000101d8227c test_aaa`core::ops::function::FnOnce::call_once$u7b$$u7b$vtable.shim$u7d$$u7d$::h1eba6d227d13bd09((null)=0x0000600002c00680, (null)=<unavailable>) at function.rs:250:5
    frame #43: 0x0000000101e4f2f4 test_aaa`std::sys::pal::unix::thread::Thread::new::thread_start::h207e3c76161182d1 [inlined] _$LT$alloc..boxed..Box$LT$F$C$A$GT$$u20$as$u20$core..ops..function..FnOnce$LT$Args$GT$$GT$::call_once::he00394a3411d39eb at boxed.rs:2015:9 [opt]
    frame #44: 0x0000000101e4f2e8 test_aaa`std::sys::pal::unix::thread::Thread::new::thread_start::h207e3c76161182d1 [inlined] _$LT$alloc..boxed..Box$LT$F$C$A$GT$$u20$as$u20$core..ops..function..FnOnce$LT$Args$GT$$GT$::call_once::hfcfcdc85194fef1b at boxed.rs:2015:9 [opt]
    frame #45: 0x0000000101e4f2e4 test_aaa`std::sys::pal::unix::thread::Thread::new::thread_start::h207e3c76161182d1 at thread.rs:108:17 [opt]
    frame #46: 0x00000001a6eba06c libsystem_pthread.dylib`_pthread_start + 148
wangxiaoying commented 6 months ago

Seems like we have to release the GIL token Python<'py> before spawning thread (in rayon) and acquire the token before allocating the string object. In this mini example: https://github.com/wangxiaoying/pyo3_test/blob/main/test.py , only test_pystring_t4 would success.