kubo / rust-oracle

Oracle driver for Rust
191 stars 44 forks source link

Can I transfer Arc<Vec<Row>> between 2 tokio thread? #89

Open tc19901016 opened 2 weeks ago

tc19901016 commented 2 weeks ago

My rust program has a oracle-read thread and a consume thread. 1)oracle-read thread use [ResultSet.next] to get Row and send a Arc<Vec> to tokio::sync::mpcs::unbounded queue. 2)consume thread receive Arc<Vec> and do something. but Rust compiler outut some error:*`mut oracle::binding::binding::dpiVector`** cannot be shared between threads safely thank you

the oracle-read code is below:

fn read_data_to_queue(&mut self, tx:UnboundedSender<(Arc<Vec<Row>>, i32, i32)>){
        let start_time = Instant::now();
        println!("start read rows");
        let mut index = 0;
        'iter_rows:loop{
            let mut row_buffer:Vec<Row> = Vec::with_capacity(ORACLE_ARRAY_SIZE as usize);
            let mut is_finish = false;
            for i in 0..ORACLE_ARRAY_SIZE{
                if let Some(r) = (*self.rows).next(){//r: Result<Row, Error>
                    row_buffer.push(r.unwrap());
                }else{
                    is_finish = true;
                    break;
                }
            }
            let temp = Arc::new(row_buffer);
            // print!("sender{};", index);
            tx.send((Arc::clone(&temp), 1, index));
            index+=1;
            if is_finish{
                break 'iter_rows;
            }
        }
        tx.send((Arc::new(Vec::new()), 0, -1));
        println!("[{:?}]read data time:{:?}",thread::current().id(), start_time.elapsed());
    }

the consume thread code is below:

'consumer:loop{
                            match rx.recv().await {
                                Some((row_buffer, is_finish, batch_no)) =>{
                                    let timestamp = Instant::now();
                                    if is_finish==0 {
                                        break 'consumer;
                                    }
                                    let (n, is_last) = parser.fetch_next_producer_thread(row_buffer).unwrap();
                                    duration_pre+=timestamp.elapsed().as_micros();
                                   #[allow(clippy::needless_range_loop)]
                                    for col in 0..dst.ncols(){
                                        let (duration_parse_for, duration_convert_for, duration_write_for) = f[col](&mut parser, &mut dst).unwrap();
                                        duration_parse+=duration_parse_for;
                                        duration_convert+=duration_convert_for;
                                        duration_write+=duration_write_for;
                                    }
                                },
                                None =>{break 'consumer;},
                            }
                        }

queue code:let (tx, mut rx):(UnboundedSender<(Arc<Vec>, i32, i32)>, UnboundedReceiver<(Arc<Vec>, i32, i32)>)=mpsc::unbounded_channel();

detail error is below: error[E0277]: *mut oracle::binding::binding::dpiVector cannot be shared between threads safely --> connectorsdm\src\dispatcher.rs:191:48 191 let read_task=tokio::spawn(async move { ___------------_^
required by a bound introduced by this call
192 let mut data_reader = crate::sources::oracle::OracleTextSourceDataReader::new(conn_url.unwrap().as_str(), sql.as_str());
193 data_reader.read_data_to_queue(tx);
194 });
_____^ *mut oracle::binding::binding::dpiVector cannot be shared between threads safely
 = help: within SqlValue<'static>, the trait Sync is not implemented for *mut oracle::binding::binding::dpiVector, which is required by {async block@connectorsdm\src\dispatcher.rs:191:48: 194:22}: Send
note: required because it appears within the type oracle::binding::binding::dpiDataBuffer --> C:\Users\tianchuan01.cargo\registry\src\mirrors.ustc.edu.cn-12df342d903acd47\oracle-0.6.2\src\binding\binding.rs:846:11 846 pub union dpiDataBuffer { ^^^^^^^^^^^^^ note: required because it appears within the type oracle::binding::binding::dpiData --> C:\Users\tianchuan01.cargo\registry\src\mirrors.ustc.edu.cn-12df342d903acd47\oracle-0.6.2\src\binding\binding.rs:1797:12
1797 pub struct dpiData {
^^^^^^^
 = note: required because it appears within the type &'static mut oracle::binding::binding::dpiData
note: required because it appears within the type oracle::sql_value::DpiData<'static> --> C:\Users\tianchuan01.cargo\registry\src\mirrors.ustc.edu.cn-12df342d903acd47\oracle-0.6.2\src\sql_value.rs:128:6 128 enum DpiData<'a> { ^^^^^^^ note: required because it appears within the type SqlValue<'static> --> C:\Users\tianchuan01.cargo\registry\src\mirrors.ustc.edu.cn-12df342d903acd47\oracle-0.6.2\src\sql_value.rs:143:12
143 pub struct SqlValue<'a> {
^^^^^^^^
 = note: required for std::ptr::Unique<SqlValue<'static>> to implement Sync
note: required because it appears within the type alloc::raw_vec::RawVec<SqlValue<'static>> --> /rustc/129f3b9964af4d4a709d1383930ade12dfe7c081\library\alloc\src\raw_vec.rs:69:19 note: required because it appears within the type Vec<SqlValue<'static>> --> /rustc/129f3b9964af4d4a709d1383930ade12dfe7c081\library\alloc\src\vec\mod.rs:398:12 note: required because it appears within the type oracle::Row --> C:\Users\tianchuan01.cargo\registry\src\mirrors.ustc.edu.cn-12df342d903acd47\oracle-0.6.2\src\row.rs:34:12 34 pub struct Row { ^^^ = note: required for std::ptr::Unique to implement Sync note: required because it appears within the type alloc::raw_vec::RawVec --> /rustc/129f3b9964af4d4a709d1383930ade12dfe7c081\library\alloc\src\raw_vec.rs:69:19 note: required because it appears within the type Vec --> /rustc/129f3b9964af4d4a709d1383930ade12dfe7c081\library\alloc\src\vec\mod.rs:398:12 = note: required for Arc<Vec> to implement Send = note: required because it appears within the type (Arc<Vec>, i32, i32) = note: required for tokio::sync::mpsc::chan::Chan<(Arc<Vec>, i32, i32), tokio::sync::mpsc::unbounded::Semaphore> to implement Sync = note: 1 redundant requirement hidden = note: required for Arc<tokio::sync::mpsc::chan::Chan<(Arc<Vec>, i32, i32), tokio::sync::mpsc::unbounded::Semaphore>> to implement Send note: required because it appears within the type tokio::sync::mpsc::chan::Tx<(Arc<Vec>, i32, i32), tokio::sync::mpsc::unbounded::Semaphore> --> C:\Users\tianchuan01.cargo\registry\src\mirrors.ustc.edu.cn-12df342d903acd47\tokio-1.40.0\src\sync\mpsc\chan.rs:19:19
19 pub(crate) struct Tx<T, S> {
^^
note: required because it appears within the type UnboundedSender<(Arc<Vec>, i32, i32)> --> C:\Users\tianchuan01.cargo\registry\src\mirrors.ustc.edu.cn-12df342d903acd47\tokio-1.40.0\src\sync\mpsc\unbounded.rs:11:12 11 pub struct UnboundedSender { ^^^^^^^^^^^^^^^ note: required because it's used within this async block --> connectorsdm\src\dispatcher.rs:191:48
191 let read_task=tokio::spawn(async move {
____^
192 let mut data_reader = crate::sources::oracle::OracleTextSourceDataReader::new(conn_url.unwrap().as_str(), sql.as_str());
193 data_reader.read_data_to_queue(tx);
194 });
_____^

note: required by a bound in tokio::spawn --> C:\Users\tianchuan01.cargo\registry\src\mirrors.ustc.edu.cn-12df342d903acd47\tokio-1.40.0\src\task\spawn.rs:167:21 | 165 | pub fn spawn(future: F) -> JoinHandle | ----- required by a bound in this function 166 | where 167 | F: Future + Send + 'static, | ^^^^ required by this bound in spawn

kubo commented 2 weeks ago

Use Arc<Mutex<Vec<Row>>> instead. If you have no need to wrap Vec with Arc, use Vec<Row> instead of Arc<Vec<Row>>.

In order to transfer type across threads, Send must be implemented. Both Arc<Mutex<Vec<Row>>> and Vec<Row> implement Send but Arc<Vec<Row>> doesn't. That's because Arc<T> implements Send when T implements both Send and Sync but Vec<Row> doesn't implements Sync.

tc19901016 commented 1 week ago

Thank you. however, I have another question. I want to convert Vec to Vec. So, I will use rayon to parse every column by thread pool. The code like this: (0..column_len).into_pariter().enumerate().(|(i, )| rows.iter().map(|r| r.get(i)?).collect()).collect(); but the compiler show some error, like this code can't run in thread pool, error line: self.data = DpiData::Var(Rc::new(DpiVar::new(handle, data)));

can I update this line by using Arc::new....? thank you.

kubo commented 1 week ago

You cannot. Though Sync must be implemented to share references between threads, Row doesn't. Well, to be precise, you can wrap Row in Mutex to share references between threads but using it with threading will be slower than simple code without threading.

Though I'm not sure what is your case, it is better to use query_as instead of query in general. See https://github.com/kubo/rust-oracle/blob/master/docs/query-methods.md#with-and-without-_as

Another option is https://quietboil.github.io/sibyl/. It uses Oracle Call Interface directly. On the other hand rust-oracle uses the interface via ODPI-C. I'm not sure which is faster.

tc19901016 commented 2 days ago

I tested millions rows data in oracle by using jdbc and rust-oracle.In different data type, some queries show rust is faster and the other show jdbc is faster. the record is below:

1) 30 columns(NUMBER(28.18)) * 5670,0000 rows

Rust-oracle:row.next()+row.get()(sec) | Rust-oracle:Just row.next()(sec) | jdbc:rs.next()+rs.getFloat()(sec) | jdbc:rs.next()(sec) -- | -- | -- | -- 44 | 26 | 21 | 10.9

2) 30 columns(NUMBER(10.0)) * 5670,0000 rows

Rust-oracle:row.next()+row.get()(sec) | Rust-oracle:Just row.next()(sec) | jdbc:rs.next()+rs.getInt()(sec) | jdbc:rs.next()(sec) -- | -- | -- | -- 24 | 21 | 20 | 10.4

3) 30 columns(Date) * 5670,0000 rows

Rust-oracle:row.next()+row.get()(sec) | Rust-oracle:Just row.next()(sec) | jdbc:rs.next()+rs.getDate()(sec) | jdbc:rs.next()(sec) -- | -- | -- | -- 25 | 23 | 82 | 13

my boss believe there must be a way that rust read all type data is faster than jdbc. So, I must explain: 1) row.next() is slower than jdbc. (I need to test more, I think this is faster than jdbc but in fact, it is not.) 2) number is slower than jdbc. maybe other type(like varchar) is also slower, I will test tomorrow

all sql like: select T_DATE AS A0,T_DATE AS A1,T_DATE AS A2,T_DATE AS A3,T_DATE AS A4,T_DATE AS A5,T_DATE AS A6,T_DATE AS A7,T_DATE AS A8,T_DATE AS A9,T_DATE AS A10,T_DATE AS A11,T_DATE AS A12,T_DATE AS A13,T_DATE AS A14,T_DATE AS A15,T_DATE AS A16,T_DATE AS A17,T_DATE AS A18,T_DATE AS A19,T_DATE AS A20,T_DATE AS A21,T_DATE AS A22,T_DATE AS A23,T_DATE AS A24,T_DATE AS A25,T_DATE AS A26,T_DATE AS A27,T_DATE AS A28,T_DATE AS A29 from tableA

test case is below: 1) Rust-oracle:row.next()+row.get()

fn oracle_test_iter(sql:&str) ->std::result::Result<(), Box<dyn Error>>{
    println!("oracle_test_iter_col sql:{}", sql);
    let manager = OracleConnectionManager::new("xxx", "xxxx", "1.1.1.1:1521/xxxx");
    let pool = r2d2::Pool::builder().max_size(1).build(manager).unwrap();
    let conn = pool.get().unwrap();
    for _ in 0..4{
        let start_time = Local::now();
        // let col_len = conn.query(sql, &[]).unwrap().column_info().len();
        let mut stmt = conn.statement(sql).prefetch_rows(1024).fetch_array_size(1024).build().unwrap();
        let rows = stmt.query(&[]).unwrap();
        let col_len = rows.column_info().len();

        let mut count=0;
        for row in rows{
            let r = row?;
            for i in 0..col_len {
                let x = r.get::<_,String>(i); //according column data type to update: get::<_, Timestamp>, get::<_, i64>, get::<_,f64>
            }
        }
        let duration = Local::now()-start_time;
        println!("duration:{}", duration);
    }
    Ok(())
}

2) Rust-oracle:row.next()

fn oracle_test_only_read(sql:&str) ->std::result::Result<(), Box<dyn Error>>{
    println!("oracle_test_only_read sql:{}", sql);
    let manager = OracleConnectionManager::new("xxx", "xxxx", "1.1.1.1:1521/xxxx");
    let pool = r2d2::Pool::builder().max_size(1).build(manager).unwrap();
    let conn = pool.get().unwrap();
    for _ in 0..10{
        let start_time = Local::now();
        // let col_len = conn.query(sql, &[]).unwrap().column_info().len();
        let mut stmt = conn.statement(sql).prefetch_rows(1024).fetch_array_size(1024).build().unwrap();
        let rows = stmt.query(&[]).unwrap();
        let col_len = rows.column_info().len();

        let mut count=0;
        for row in rows{
            // let r = row?;
            // for i in 0..col_len {
            //     let x = r.get::<_,String>(i);
            // }
            // count+=1;
        }
        let duration = Local::now()-start_time;
        println!("duraction:{}", duration);
    }
    println!();
    Ok(())
}

3) jdbc: rs.next()+rs.getFloat()

    private static void test_iter(String sql, int testNum, boolean onlineFlag) throws Exception{
        Connection conn = null;
        if(onlineFlag){
            conn = DriverManager.getConnection("jdbc:oracle:thin:@1.1.1.1:1521/xxx", "xxx", "xxx");
        }else{
            conn = DriverManager.getConnection("jdbc:oracle:thin:@1.1.1.1:1521/xxx", "xxx", "xxx");
        }
       System.out.println("test_iter"+sql);
        List<List<Object>> list = new ArrayList<>();
        for(int k=0;k<testNum;k++){
            long timestamp = System.currentTimeMillis();
            try (PreparedStatement ps = conn.prepareStatement(sql)) {
                ps.setFetchSize(1024);
                try (ResultSet rs = ps.executeQuery()) {
                    ResultSetMetaData metaData = rs.getMetaData();
                    int columnCount = metaData.getColumnCount();
                    while (rs.next()) {
                        for (int i = 0; i < columnCount; i++) {
                            float columnValue = rs.getFloat(i+1);//rs.getInt;rs.getDate
                        }
                    }
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
            long duration = System.currentTimeMillis() - timestamp;
            System.out.println("Finished:"+duration);
        }
    }

4)jdbc: rs.next()

private static void test_only_read(String sql, int testNum, boolean onlineFlag) throws Exception{
        Connection conn = null;
        if(onlineFlag){
            conn = DriverManager.getConnection("jdbc:oracle:thin:@1.1.1.1:1521/xxx", "xxx", "xxx");
        }else{
            conn = DriverManager.getConnection("jdbc:oracle:thin:@1.1.1.1:1521/xxx", "xxx", "xxx");
        }
        System.out.println("test_only_read"+sql);
        List<List<Object>> list = new ArrayList<>();
        for(int k=0;k<testNum;k++){
            long timestamp = System.currentTimeMillis();
            try (PreparedStatement ps = conn.prepareStatement(sql)) {
                ps.setFetchSize(1024);
                try (ResultSet rs = ps.executeQuery()) {
                    ResultSetMetaData metaData = rs.getMetaData();
                    int columnCount = metaData.getColumnCount();
                    while (rs.next()) {
//                        for (int i = 0; i < columnCount; i++) {
//                            String columnValue = rs.getString(i+1);
//                        }
                    }
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
            long duration = System.currentTimeMillis() - timestamp;
            System.out.println("Finished:"+duration);
        }
    }
kubo commented 1 day ago

In the flame graph of version 0.6.1-dev in https://github.com/kubo/rust-oracle/issues/44#issuecomment-2185927331, upirtrc, which is an internal function inside of an Oracle client library, takes 47.77% of samples. Could you make flame graph of your code? I guess that more than half of time is consumed in the Oracle library in the case of "Rust-oracle:Just row.next()"

As for the difference of seconds between NUMBER(28.18) and NUMBER(10.0), I guess that is caused by here. When the column definition is NUMBER(10.0), the column values are converted from NUMBER to int64 inside of Oracle Call Interface (OCI). row.get<_, i64> just copies the int64 value. On the other hand when the definition is NUMBER(28.18), the values are passed through the OCI, formatted as string representation in ODPI-C and then converted to requested type such f64 inside of rust-oracle. In the current implementation, the string values owned by ODPI-C are converted to f64 via String. I may improve it by using &str referring to the buffer in OCPI-C instead of temporal String owning data copied from the buffer.

cjbj commented 19 hours ago

@tc19901016 Did you try slightly bigger arraysize values? For such a large number of rows I would have left prefetch size at its default.

@kubo ping us if you need help, or to nudge us about requests like https://github.com/oracle/odpi/issues/172

tc19901016 commented 12 hours ago

@kubo You are right. 1)In rust-readonly test(test case 2), dpiStmt_fetchRows cost 56%, RowValue::get and drop cost the others. I think that's why rust-readonly test slower than jdbc(maybe jdbc only run like stmt.next()) flamegraph_i64_onlyread

2)In rust-itercol test(test case 1), Number(28,18) increase get_string_unchecked cost than Number(10). f64 flamegraph: flamegraph_f64_itercol i64 flamegraph: flamegraph_i64_itercol Though flamegraph is created, I have no idea to increase reading speed. I will try to modify this project. if I have some results, I will write a comment .