surrealdb / surrealkv

A low-level, versioned, embedded, ACID-compliant, key-value database for Rust
https://surrealdb.com
Apache License 2.0
296 stars 18 forks source link

Tokio reliance #80

Open stevefan1999-personal opened 1 month ago

stevefan1999-personal commented 1 month ago

At the moment, these are the only few things that indicates Tokio requirement

https://github.com/surrealdb/surrealkv/blob/c6eadd17f8255f0fe6b0346fadf8925ad05d90e1/src/storage/kv/store.rs#L9-L12

While AsyncMutex should be able to be replaced with https://docs.rs/async-mutex/latest/async_mutex/index.html, for the task runner and async drop it would be a little more tricky.

For task runner, we need to inverse the runtime control back to the user, so the user now has to implement a trait which will supply the async future to a user specific runtime spawner.

And for the async drop, we need to wait for https://rust-lang.github.io/async-fundamentals-initiative/roadmap/async_drop.html.

stevefan1999-personal commented 1 month ago

I'm able to confirm that this code would allow SurrealKV to run on smol without extra compatibility shim after merging the two PRs!

use std::{env, sync::Arc};

use derive_more::derive::{Display, Error, From};
use surrealkv::{IsolationLevel, Options, Store};

use std::{
    future::Future,
    pin::{pin, Pin},
    result::Result,
    task::{Context, Poll},
};

use surrealkv::async_runtime::{JoinHandle, TaskSpawner};

#[derive(Debug, Display, Error, From)]
enum Error {
    SurrealKv(surrealkv::Error),
    Utf8(std::string::FromUtf8Error),
    ParseInt(std::num::ParseIntError),
    Io(std::io::Error),
}

fn main() -> Result<(), Error> {
    smol::block_on(async {
        let store = Store::with_spawner(
            Options {
                dir: env::current_dir()?.join("./foo"),
                isolation_level: IsolationLevel::SerializableSnapshotIsolation,
                ..Options::default()
            },
            Arc::new(SmolSpawner),
        )?;
        let mut tx = store.begin()?;
        if let Some(foo) = tx.get(b"foo")? {
            let foo = String::from_utf8(foo)?;
            println!("old value {}", foo);
            let mut foo: u32 = foo.parse()?;
            foo += 1;
            println!("new value {}", foo);
            tx.set(b"foo", foo.to_string().as_bytes())?;
        } else {
            tx.set(b"foo", b"1")?;
        }

        tx.commit().await?;
        Ok(())
    })
}

#[derive(Copy, Clone, Default)]
pub struct SmolSpawner;

impl TaskSpawner for SmolSpawner {
    fn spawn<F>(&self, f: F) -> Box<dyn JoinHandle<F::Output> + Unpin>
    where
        F: Future + Send + 'static,
        F::Output: Send + 'static,
    {
        Box::new(SmolJoinHandle(smol::spawn(f).fallible()))
    }
}

pub struct SmolJoinHandle<T>(async_task::FallibleTask<T>);

impl<T> Future for SmolJoinHandle<T> {
    type Output = Result<T, Box<dyn std::error::Error>>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match pin!(&mut self.0).poll(cx) {
            Poll::Ready(Some(res)) => Poll::Ready(Ok(res)),
            Poll::Ready(None) => Poll::Ready(Err("Task failed".into())),
            Poll::Pending => Poll::Pending,
        }
    }
}

impl<T: Send> JoinHandle<T> for SmolJoinHandle<T> {}