fzyzcjy / flutter_rust_bridge

Flutter/Dart <-> Rust binding generator, feature-rich, but seamless and simple.
https://fzyzcjy.github.io/flutter_rust_bridge/
MIT License
3.61k stars 254 forks source link

Stream Dart -> Rust #1885

Closed xkeyC closed 3 days ago

xkeyC commented 1 month ago

Is your feature request related to a problem? Please describe. https://github.com/fzyzcjy/flutter_rust_bridge/issues/632

Describe the solution you'd like

Thanks for your work, this is a great project.

I'm new to Rust and I'm using tokio::process with win32job to handle subprocesses on the Windows desktop.

Currently, I send stdout from rust to dart via StreamSink.

I have a new requirement, which is to stream from dart to rust stdin.

632 mentioned that rust functions can be used instead, but my function is designed to start multiple child processes, and using this method will increase complexity (in order to distinguish processes, use static objects, etc.).

Is it possible to add a StreamListener type that allows receiving data from Dart? This application can reduce the complexity of implementing some requirements.

Thanks.

Describe alternatives you've considered

Additional context

welcome[bot] commented 1 month ago

Hi! Thanks for opening your first issue here! :smile:

fzyzcjy commented 1 month ago

but my function is designed to start multiple child processes, and using this method will increase complexity (in order to distinguish processes, use static objects, etc.).

What about using methods, for example:

pub struct MyStruct { child_process: Process }

impl MyStruct {
  pub fn spawn_child_process() -> Self {
    Self { child_process: your_method_to_create_a_process() }
  }

  pub fn send_message_to_child_process(&self, message: String) {
    self.child_process.send_message(message)
  }
}

and use in dart:

var process = await MyStruct::spawn_child_process();
process.send_message_to_child_process('hi');

If you have multi subprocess, just create multiple instances, etc

xkeyC commented 1 month ago

I try to do it, but when I call rsprocess.write, I get this error:

thread 'tokio-runtime-worker' panicked at C:\Users\xkeyc\.cargo\registry\src\index.crates.io-6f17d22bba15001f\flutter_rust_bridge-2.0.0-dev.32\src\rust_auto_opaque\dart2rust.rs:22:14:
Fail to mutably borrow object. Please ensure the object is not borrowed elsewhere at the same time, which violates Rust's rules.: TryLockError(())
stack backtrace:
   0:     0x7fffdb5b1e32 - <unknown>
   1:     0x7fffdb5d37ad - <unknown>
   2:     0x7fffdb5adba1 - <unknown>
   3:     0x7fffdb5b1c5a - <unknown>
   4:     0x7fffdb5b4069 - <unknown>
   5:     0x7fffdb5b3d25 - <unknown>
   6:     0x7fffdb5b4584 - <unknown>
   7:     0x7fffdb5b4459 - <unknown>
   8:     0x7fffdb5b2739 - <unknown>
   9:     0x7fffdb5b4126 - <unknown>
  10:     0x7fffdb5f1c87 - <unknown>
  11:     0x7fffdb5f22c3 - <unknown>
  12:     0x7fffdadc26a2 - <unknown>
  13:     0x7fffdad88b9a - <unknown>
  14:     0x7fffdad5a4f1 - <unknown>
  15:     0x7fffdad5a2c2 - <unknown>
  16:     0x7fffdada0f57 - <unknown>
  17:     0x7fffdad86517 - <unknown>
  18:     0x7fffdad87aaa - <unknown>
  19:     0x7fffdad86643 - <unknown>
  20:     0x7fffdad804b6 - <unknown>
  21:     0x7fffdad84603 - <unknown>
  22:     0x7fffdad7ceeb - <unknown>
  23:     0x7fffdad7a30b - <unknown>
  24:     0x7fffdad8757f - <unknown>
  25:     0x7fffdad9f9dc - <unknown>
  26:     0x7fffdad9930c - <unknown>
  27:     0x7fffdad991f8 - <unknown>
  28:     0x7fffdadb19bf - <unknown>
  29:     0x7fffdad87033 - <unknown>
  30:     0x7fffdad7f8b6 - <unknown>
  31:     0x7fffdad84603 - <unknown>
  32:     0x7fffdad7cfab - <unknown>
  33:     0x7fffdad7a42b - <unknown>
  34:     0x7fffdadaff46 - <unknown>
  35:     0x7fffdadb2406 - <unknown>
  36:     0x7fffdadb5523 - <unknown>
  37:     0x7fffdadbbfcb - <unknown>
  38:     0x7fffdb4afe16 - <unknown>
  39:     0x7fffdb4bffd2 - <unknown>
  40:     0x7fffdb4ba27a - <unknown>
  41:     0x7fffdb4ba054 - <unknown>
  42:     0x7fffdb4b99c5 - <unknown>
  43:     0x7fffdb4b96c5 - <unknown>
  44:     0x7fffdb5063db - <unknown>
  45:     0x7fffdb4bf986 - <unknown>
  46:     0x7fffdb4ee228 - <unknown>
  47:     0x7fffdb4ec9ee - <unknown>
  48:     0x7fffdb4bf943 - <unknown>
  49:     0x7fffdb4b9581 - <unknown>
  50:     0x7fffdb4fd242 - <unknown>
  51:     0x7fffdb4b934c - <unknown>
  52:     0x7fffdb4b923e - <unknown>
  53:     0x7fffdb4d0a2b - <unknown>
  54:     0x7fffdb4e0b7b - <unknown>
  55:     0x7fffdb4e0828 - <unknown>
  56:     0x7fffdb4e47af - <unknown>
  57:     0x7fffdb4fd963 - <unknown>
  58:     0x7fffdb508bf6 - <unknown>
  59:     0x7fffdb509a03 - <unknown>
  60:     0x7fffdb50841b - <unknown>
  61:     0x7fffdb4de8ab - <unknown>
  62:     0x7fffdb4e4166 - <unknown>
  63:     0x7fffdb4e28f6 - <unknown>
  64:     0x7fffdb4e2753 - <unknown>
  65:     0x7fffdb4b015b - <unknown>
  66:     0x7fffdb4afe16 - <unknown>
  67:     0x7fffdb4c0088 - <unknown>
  68:     0x7fffdb4c0475 - <unknown>
  69:     0x7fffdb4c2d1b - <unknown>
  70:     0x7fffdb4c2aa1 - <unknown>
  71:     0x7fffdb4c3cd9 - <unknown>
  72:     0x7fffdb4e7611 - <unknown>
  73:     0x7fffdb4fd791 - <unknown>
  74:     0x7fffdb5087d5 - <unknown>
  75:     0x7fffdb509a03 - <unknown>
  76:     0x7fffdb507ff4 - <unknown>
  77:     0x7fffdb4e7434 - <unknown>
  78:     0x7fffdb49beee - <unknown>
  79:     0x7fffdb5bf31c - <unknown>
  80:     0x7ff8e964257d - BaseThreadInitThunk
  81:     0x7ff8eab2aa48 - RtlUserThreadStart
[ERROR:flutter/runtime/dart_vm_initializer.cc(41)] Unhandled Exception: PanicException(Fail to mutably borrow object. Please ensure the object is not borrowed elsewhere at the same time, which violates Rust's rules.: TryLockError(()))
#0      SimpleDecoder.decode (package:flutter_rust_bridge/src/codec/base.dart:35:9)
#1      DcoCodec.decodeObject (package:flutter_rust_bridge/src/codec/dco.dart:25:45)
#2      Future._propagateToListeners.handleValueCallback (dart:async/future_impl.dart:838:45)
#3      Future._propagateToListeners (dart:async/future_impl.dart:867:13)
#4      Future._completeWithValue (dart:async/future_impl.dart:643:5)
#5      Future._asyncCompleteWithValue.<anonymous closure> (dart:async/future_impl.dart:713:7)
#6      _microtaskLoop (dart:async/schedule_microtask.dart:40:21)
#7      _startMicrotaskLoop (dart:async/schedule_microtask.dart:49:5)

My Rust Code

use flutter_rust_bridge::for_generated::futures::lock::Mutex;
use std::sync::Arc;

use flutter_rust_bridge::frb;
use tokio::io::BufReader;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
use tokio::process::ChildStdin;

use crate::frb_generated::StreamSink;

#[frb(opaque)]
pub struct RsProcess {
    child_stdin: Option<Arc<Mutex<ChildStdin>>>,
    pid: Option<u32>,
}

#[derive(Clone, Copy)]
pub enum RsProcessStreamDataType {
    Output,
    Error,
    Exit,
}

pub struct RsProcessStreamData {
    pub data_type: RsProcessStreamDataType,
    pub data: String,
}

impl RsProcess {
    #[frb(sync)]
    pub fn new() -> Self {
        RsProcess {
            child_stdin: None,
            pid: None,
        }
    }

    pub async fn start(
        &mut self,
        executable: String,
        arguments: Vec<String>,
        working_directory: String,
        stream_sink: StreamSink<RsProcessStreamData>,
    ) {
        let stream_sink_arc = Arc::from(stream_sink);

        let mut command = tokio::process::Command::new(executable);
        command
            .args(arguments)
            .current_dir(working_directory)
            .stdin(std::process::Stdio::piped())
            .stdout(std::process::Stdio::piped())
            .stderr(std::process::Stdio::piped())
            .kill_on_drop(true);

        command.creation_flags(0x08000000);

        let job = win32job::Job::create().unwrap();
        let mut info = job.query_extended_limit_info().unwrap();
        info.limit_kill_on_job_close();
        job.set_extended_limit_info(&mut info).unwrap();

        let job_arc = Arc::from(job);

        if let Ok(mut child) = command.spawn() {
            {
                let raw_handle = child.raw_handle();
                if raw_handle.is_some() {
                    job_arc
                        .assign_process(raw_handle.unwrap() as isize)
                        .unwrap();
                }
            }

            let stdout = child.stdout.take().expect("Failed to open stdout");
            let stderr = child.stderr.take().expect("Failed to open stderr");

            let stdin = child.stdin.take().expect("Failed to open stdin");
            self.child_stdin = Some(Arc::from(Mutex::new(stdin)));

            let output_task = tokio::spawn(_process_output(
                stdout,
                stream_sink_arc.clone(),
                RsProcessStreamDataType::Output,
            ));
            let error_task = tokio::spawn(_process_output(
                stderr,
                stream_sink_arc.clone(),
                RsProcessStreamDataType::Error,
            ));

            self.pid = child.id();

            tokio::select! {
            _ = output_task => (),
            _ = error_task => (),
            }

            let exit_status = child
                .wait()
                .await
                .expect("Failed to wait for child process");

            if !exit_status.success() {
                eprintln!("Child process exited with an error: {:?}", exit_status);
                let message = RsProcessStreamData {
                    data_type: RsProcessStreamDataType::Exit,
                    data: "exit".to_string(),
                };
                stream_sink_arc.add(message).unwrap();
            }
        } else {
            eprintln!("Failed to start");
            let message = RsProcessStreamData {
                data_type: RsProcessStreamDataType::Error,
                data: "Failed to start".to_string(),
            };
            stream_sink_arc.add(message).unwrap();
        }
    }

    pub async fn write(&mut self, data: String) {
        if let Some(stdin) = &self.child_stdin {
            let mut stdin_lock = stdin.lock().await;
            stdin_lock.write_all(data.as_bytes()).await.unwrap();
        }
    }

    #[frb(sync)]
    pub fn get_pid(&self) -> Option<u32> {
        self.pid
    }
}

async fn _process_output<R>(
    stdout: R,
    stream_sink: Arc<StreamSink<RsProcessStreamData>>,
    data_type: RsProcessStreamDataType,
) where
    R: tokio::io::AsyncRead + Unpin,
{
    let reader = BufReader::new(stdout);
    let mut lines = reader.lines();

    while let Some(line) = lines.next_line().await.unwrap() {
        let message = RsProcessStreamData {
            data_type,
            data: line.trim().parse().unwrap(),
        };
        stream_sink.add(message).unwrap();
    }
}

Anyway, I need to save tokio::process::ChildStdin or tokio::process::Child in struct RsProcess for write operation.

I think this has to do with using &mut self asynchronously ,but I don't know how to solve this problem now.

fzyzcjy commented 1 month ago

Fail to mutably borrow object.

Hi, could you please show Dart code about this? I guess you are calling methods concurrently - which violates Rust's safety model.

xkeyC commented 1 month ago

Fail to mutably borrow object.

Hi, could you please show Dart code about this? I guess you are calling methods concurrently - which violates Rust's safety model.

_process = RsProcess();

    final stream = _process?.start(
        executable: exec, arguments: [], workingDirectory: execDir);

    stream?.listen((event) async {
      switch (event.dataType) {
        case RsProcessStreamDataType.output:
          try {
            final eventJson = await compute(json.decode, event.data);
            _handleMessage(eventJson);
          } catch (e) {
            dPrint("json error: $e");
          }
          break;

I need to write stdin in the stream.listen -> _handleMessage of _process?.start.

fzyzcjy commented 1 month ago

Just briefly looked at it. Is it possible that, start is &mut self and holds self for too long time?

What about sth like this:

#[frb(stream_dart_await)]
fn start(&mut self) {
  create_necessary_resources_and_fill_to_self;
  tokio::spawn(|| {
   the_real_slow_work_such_as_listening_to_the_process_stdout;
  });
}

by doing so, your start function will return fast without locking self for a long time.

In addition, by using #[frb(stream_dart_await)] (and await on dart), your dart code will continue only after the rust initialization ends.

xkeyC commented 1 month ago

Just briefly looked at it. Is it possible that, start is &mut self and holds self for too long time?

What about sth like this:

#[frb(stream_dart_await)]
fn start(&mut self) {
  create_necessary_resources_and_fill_to_self;
  tokio::spawn(|| {
   the_real_slow_work_such_as_listening_to_the_process_stdout;
  });
}

by doing so, your start function will return fast without locking self for a long time.

In addition, by using #[frb(stream_dart_await)] (and await on dart), your dart code will continue only after the rust initialization ends.

Thanks, I will try it later.

fzyzcjy commented 1 month ago

You are welcome, and feel free to ping me if it does not work!

xkeyC commented 1 month ago

[frb(stream_dart_await)]

Not work

If i move tokio::select! {_ = output_task => (), _ = error_task => ()} in tokio::spawn(async move {} ) , the process will exit with code 0 ,and not has any error.

if move out, and remove #[frb(stream_dart_await)] , The process was run , and get TryLockError(()) when call write

full rust code:

use flutter_rust_bridge::for_generated::futures::lock::Mutex;
use std::sync::Arc;

use flutter_rust_bridge::frb;
use tokio::io::BufReader;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
use tokio::process::ChildStdin;

use crate::frb_generated::StreamSink;

#[frb(opaque)]
pub struct RsProcess {
    child_stdin: Option<Arc<Mutex<ChildStdin>>>,
    pid: Option<u32>,
}

#[derive(Clone, Copy)]
pub enum RsProcessStreamDataType {
    Output,
    Error,
    Exit,
}

pub struct RsProcessStreamData {
    pub data_type: RsProcessStreamDataType,
    pub data: String,
}

impl RsProcess {
    #[frb(sync)]
    pub fn new() -> Self {
        RsProcess {
            child_stdin: None,
            pid: None,
        }
    }

    #[frb(stream_dart_await)]
    pub async fn start(
        &mut self,
        executable: String,
        arguments: Vec<String>,
        working_directory: String,
        stream_sink: StreamSink<RsProcessStreamData>,
    ) {
        let stream_sink_arc = Arc::from(stream_sink);

        let mut command = tokio::process::Command::new(executable);
        command
            .args(arguments)
            .current_dir(working_directory)
            .stdin(std::process::Stdio::piped())
            .stdout(std::process::Stdio::piped())
            .stderr(std::process::Stdio::piped())
            .kill_on_drop(true);

        command.creation_flags(0x08000000);

        let job = win32job::Job::create().unwrap();
        let mut info = job.query_extended_limit_info().unwrap();
        info.limit_kill_on_job_close();
        job.set_extended_limit_info(&mut info).unwrap();

        let job_arc = Arc::from(job);

        if let Ok(mut child) = command.spawn() {
            {
                let raw_handle = child.raw_handle();
                if raw_handle.is_some() {
                    job_arc
                        .assign_process(raw_handle.unwrap() as isize)
                        .unwrap();
                }
            }

            let stdin = child.stdin.take().expect("Failed to open stdin");
            self.child_stdin = Some(Arc::from(Mutex::new(stdin)));
            self.pid = child.id();

            let stdout = child.stdout.take().expect("Failed to open stdout");
            let stderr = child.stderr.take().expect("Failed to open stderr");

            let output_task = tokio::spawn(_process_output(
                stdout,
                stream_sink_arc.clone(),
                RsProcessStreamDataType::Output,
            ));
            let error_task = tokio::spawn(_process_output(
                stderr,
                stream_sink_arc.clone(),
                RsProcessStreamDataType::Error,
            ));

            tokio::spawn(async move {
                tokio::select! {_ = output_task => (), _ = error_task => () }

                let exit_status = child
                    .wait()
                    .await
                    .expect("Failed to wait for child process");

                // print exit_status.code()
                println!("Child process exited with: {:?}", exit_status.code());

                if !exit_status.success() {
                    println!("Child process exited with an error: {:?}", exit_status);
                    let message = RsProcessStreamData {
                        data_type: RsProcessStreamDataType::Exit,
                        data: "exit".to_string(),
                    };
                    stream_sink_arc.add(message).unwrap();
                }
            });
        } else {
            println!("Failed to start");
            let message = RsProcessStreamData {
                data_type: RsProcessStreamDataType::Error,
                data: "Failed to start".to_string(),
            };
            stream_sink_arc.add(message).unwrap();
        }
    }

    pub async fn write(&mut self, data: String) {
        if let Some(stdin) = &self.child_stdin {
            let mut stdin_lock = stdin.lock().await;
            stdin_lock.write_all(data.as_bytes()).await.unwrap();
        }
    }

    #[frb(sync)]
    pub fn get_pid(&self) -> Option<u32> {
        self.pid
    }
}

async fn _process_output<R>(
    stdout: R,
    stream_sink: Arc<StreamSink<RsProcessStreamData>>,
    data_type: RsProcessStreamDataType,
) where
    R: tokio::io::AsyncRead + Unpin,
{
    let reader = BufReader::new(stdout);
    let mut lines = reader.lines();

    while let Some(line) = lines.next_line().await.unwrap() {
        let message = RsProcessStreamData {
            data_type,
            data: line.trim().parse().unwrap(),
        };
        stream_sink.add(message).unwrap();
    }
}

log

Child process exited with: Some(0)

I think in order for the process to run, I might have to block on start, so Struct may not be a complete replacement for Stream ?

xkeyC commented 1 month ago

Finally return to static ... 😂

pub struct RsProcess {
    pub child_stdin: ChildStdin,
    pub rs_pid: u32,
}

lazy_static! {
    static ref RS_PROCESS_MAP: Mutex<HashMap<u32, RsProcess>> = Mutex::new(HashMap::new());
}

pub async fn start(
    executable: String,
    arguments: Vec<String>,
    working_directory: String,
    stream_sink: StreamSink<RsProcessStreamData>,
) {
/// ...
 {
            let mut map = RS_PROCESS_MAP.lock().await;
            map.insert(
                pid,
                RsProcess {
                    child_stdin: stdin,
                    rs_pid: pid,
                },
            );
}

        defer! {
            let mut map = block_on(RS_PROCESS_MAP.lock());
            map.remove(&pid);
            println!("RS_PROCESS_MAP ..defer ..len() = {}", map.len());
        }
/// ...
}
fzyzcjy commented 1 month ago

the process will exit with code 0 ,and not has any error.

I guess you need to somehow store the running process into the struct, and at the same time do not block for it to finish. It is quite weird to see it does not support this (or maybe we are not using it correctly?)

Finally return to static ...

😂

fzyzcjy commented 3 days ago

Close since this seems to be solved. Feel free to reopen if needed!