awestlake87 / pyo3-asyncio

Other
300 stars 45 forks source link

Use synchronous and asynchronous methods with self-reference pyo3 #93

Closed fvaleye closed 1 year ago

fvaleye commented 1 year ago

πŸ”ˆ Ask for help

Hello πŸ‘‹,

Thanks for this great library to make asynchronous calls with asyncio.

I am raising this issue to have your point of view on how making the best implementation when it comes to mixing synchronous and asynchronous calls with pyo3 and pyo3-asyncio with self-reference.

We want to leverage pyo3-asyncio here to have the RawDeltaTable with asynchronous methods.

I didn't succeed in including asynchronous calls on the same RawDeltaTable structure. The following example introduces a asynchronous method load_version() (it doesn't work):

#[pyclass]
struct RawDeltaTable {
    _table: deltalake::DeltaTable,
    // storing the config additionally on the table helps us make pickling work.
    _config: FsConfig,
}

#[inline]
fn rt() -> PyResult<tokio::runtime::Runtime> {
    tokio::runtime::Runtime::new().map_err(PyDeltaTableError::from_tokio)
}

#[pymethods]
impl RawDeltaTable {

pub fn load_version(&mut self, version: deltalake::DeltaDataTypeVersion) -> PyResult<()> {
    rt()?
        .block_on(self._table.load_version(version))
        .map_err(PyDeltaTableError::from_raw)
}

pub fn load_version_async<'a>(&mut self, version: deltalake::DeltaDataTypeVersion, py: Python<'a>) -> PyResult<&'a PyAny> {
        let this = Mutex::new(Arc::new(&self._table));
        pyo3_asyncio::tokio::future_into_py(py, async move {
            this.lock()
                .await
                .load_version(version)
                .await
                .map_err(PyDeltaTableError::from_raw);
            Ok(())
        })
    }
}

I only succeed to make it works by creating a new RawDeltaTableAsync structure here to allow synchronous calls relying on the RawDeltaTable and overriding calls asynchronously with Arc and Mutex integration.

Is this the right way to go? Is there anything else to do to have the same structure with sync and async methods?

Thank you for your help!

fvaleye commented 1 year ago

After digging, I decided to refactor the RawDeltaTable structure to make the integration of pyo3-asyncioclearer

#[pyclass]
struct RawDeltaTable {
    _table: deltalake::DeltaTable,
    _table: Arc<Mutex<deltalake::DeltaTable>>,
    // storing the config additionally on the table helps us make pickling work.
    _config: FsConfig,
    _rt: tokio::runtime::Runtime,
}

Use asynchronous calls:

 pub fn load_version_async<'a>(
        &mut self,
        version: deltalake::DeltaDataTypeVersion,
        py: Python<'a>,
    ) -> PyResult<&'a PyAny> {
        let this = Arc::clone(&self._table);
        pyo3_asyncio::tokio::future_into_py(py, async move {
            this.lock()
                .await
                .load_version(version)
                .await
                .map_err(PyDeltaTableError::from_raw)?;
            Ok(())
        })
    }

With synchronous calls:

pub fn load_version(&mut self, version: deltalake::DeltaDataTypeVersion) -> PyResult<()> {
        let mut table = self._rt.block_on(self._table.lock());
        self._rt
            .block_on(table.load_version(version))
            .map_err(PyDeltaTableError::from_raw)
    }