fzyzcjy / flutter_rust_bridge

Flutter/Dart <-> Rust binding generator, feature-rich, but seamless and simple.
https://fzyzcjy.github.io/flutter_rust_bridge/
MIT License
3.79k stars 274 forks source link

Dart StreamController.stream -> Rust #2195

Open normalllll opened 2 weeks ago

normalllll commented 2 weeks ago

I need to implement a function that transmits stream from Dart to Rust, such as gRPC client stream. Is there a way to achieve this without using additional "fn add(...)"?

welcome[bot] commented 2 weeks ago

Hi! Thanks for opening your first issue here! :smile:

fzyzcjy commented 2 weeks ago

Currently I guess you can do something like:

Stream<String> myStream = ...; // suppose this is your stream
myStream.listen(handle_my_stream_event);
pub fn handle_my_stream_event(event: String) {
  // ...
}

Or, if you want the Rust side to be stateful, another simple example:

Stream<String> myStream = ...; // suppose this is your stream
var a = MyRustSideWorker(); // suppose you create the rust object somewhere in the code
myStream.listen(a.handle_my_stream_event);
pub struct MyRustSideWorker { ... }

impl MyRustSideWorker {
  pub fn handle_my_stream_event(&self, event: String) {
    // ...
  }
}
normalllll commented 2 weeks ago

I mean something like this:

  StreamController<int> streamController = StreamController<int>();
  final sink = streamController.sink;

  final stream = streamController.stream;

  final result = await rust_client_stream_func(stream);

  sink.add(1);
  sink.add(2);
  sink.add(3);
  sink.close();
pub async fn rust_client_stream_func(mut stream: impl Stream<Item=i32> + Send + Unpin) -> Result<i32, String> {
    let mut sum = 0;
    while let Some(value) = stream.next().await {
        sum += value;
    }
    Ok(sum)
}
fzyzcjy commented 2 weeks ago

I see. That looks implementable, and feel free to PR for this! Alternatively, I may work on it later.

Currently, one simple way to workaround may be using tokio oneshot channels. The sender is like stream sink and receiver is like stream. Then, the rust_client_stream_func can get a oneshot receiver.

I may think about it later a bit more.

normalllll commented 2 weeks ago

Is there a way to convert these 3 Rust examples to Dart at the same time?

struct HelloRequest {
    name: String,
}

struct HelloResponse {
    message: String,
}

pub async fn rust_client_stream_func(mut stream: impl Stream<Item=HelloRequest> + Send + Unpin) -> Result<HelloResponse, String> {
    while let Some(value) = stream.next().await {
        println!("Received: {}", value.name);
    }

    Ok(HelloResponse {
        message: "Hello".to_string()
    })
}

pub async fn rust_server_stream_func(request: HelloRequest) -> Result<impl Stream<Item=HelloResponse>, Box<dyn std::error::Error>> {
    let mut count = 0;
    let interval = tokio::time::interval(Duration::from_secs(1));
    let name = request.name.clone();

    let stream = stream::unfold(interval, move |mut interval| {
        let name = name.clone();
        async move {
            if count < 5 {
                interval.tick().await;
                count += 1;
                Some((
                    HelloResponse {
                        message: format!("Hello, {}!", name),
                    },
                    interval,
                ))
            } else {
                None
            }
        }
    });

    Ok(stream)
}

pub async fn rust_bidirectional_stream_func(
    mut in_stream: impl Stream<Item = HelloRequest> + Send + Unpin,
) -> Result<impl Stream<Item = HelloResponse>, Box<dyn std::error::Error>> {
    let interval = tokio::time::interval(Duration::from_secs(1));

    let out_stream = stream::unfold((interval, in_stream), move |(mut interval, mut stream)| async move {
        if let Some(request) = stream.next().await {
            interval.tick().await;
            let response = HelloResponse {
                message: format!("Hello, {}!", request.name),
            };
            Some((response, (interval, stream)))
        } else {
            None
        }
    });

    Ok(out_stream)
}

#[tokio::test]
async fn test(){
    use futures::stream;
    let requests = stream::iter(vec![
        HelloRequest { name: "Alice".to_string() },
        HelloRequest { name: "Bob".to_string() },
        HelloRequest { name: "Charlie".to_string() },
        HelloRequest { name: "Dave".to_string() },
        HelloRequest { name: "Eve".to_string() },
    ]);

    let responses = rust_bidirectional_stream_func(requests).await.unwrap();
    tokio::pin!(responses);

    while let Some(response) = responses.next().await {
        println!("{}", response.message);
    }
}
normalllll commented 2 weeks ago

I created an RPC framework for Rust (still in development) and currently use proc_macro to generate code, hoping it can be used seamlessly in Dart.

fzyzcjy commented 2 weeks ago

Firstly, if we add support for Streams, then this will be doable, and feel free to PR for this.

Secondly, without that, a temporary workaround is roughly like:

pub struct Dart2RustStreamSink(mpsc::Sender);

fn create_stream() -> (Dart2RustStreamSink, mpsc::Receiver) {
  tokio::sync::mpsc::channel()
}

impl Dart2RustStreamSink {
  pub async fn add(&self, data) { self.0.send(data) }
}

fn rust_client_stream_func(stream: oneshot::Receiver) {
    while let Some(value) = stream.next().await {
        println!("Received: {}", value.name);
    }
}
Dart2RustStream createDart2RustStream(Stream pureDartStream) {
  var (dart2rustSink, dart2rustStream) = createStream();
  pureDartStream.listen(dart2rustSink.add);
}

rust_client_stream_func(createDart2RustStream(yourExistingStream));

since generic is not supported yet, we can use macros to generate for each type. (again feel free to PR for generics and that will make it easier to done.

And for rust -> dart stream, check the doc for one.

normalllll commented 2 weeks ago
use anyhow::Result;
use std::{thread::sleep, time::Duration};

use crate::frb_generated::StreamSink;

const ONE_SECOND: Duration = Duration::from_secs(1);

// can't omit the return type yet, this is a bug
pub fn tick(sink: StreamSink<i32>) -> Result<()> {
    let mut ticks = 0;
    loop {
        sink.add(ticks);
        sleep(ONE_SECOND);
        if ticks == i32::MAX {
            break;
        }
        ticks += 1;
    }
    Ok(())
}

I checked the documentation about Rust -> Dart Stream, and there is a problem with this, that is, the StreamSink in frb_generate must be used, and it needs to be placed in the parameter.

For code with Result<impl Stream<...>,...> generated by external library cannot be automatically processed. It seems that additional code must be written to manually nest.

fzyzcjy commented 2 weeks ago

Yes, currently you may need to write a wrapper function, again maybe with help of Rust macros to avoid repeating. However, feel free to PR to support returning a impl Stream! I may work on it later, but cannot guarantee since this may not be of super high priority.

normalllll commented 1 week ago

I tried this:

```rust use std::sync::Arc; use rrpc_core::client::{Client, QuicClient, QuicConfig}; use rrpc_core::codec::BincodeCodec; // mod hello{ // use serde::{Deserialize, Serialize}; // use rrpc_core::transport::*; // use rrpc_core::client::{Client, QuicClient, QuicConfig}; // use rrpc_core::codec::{BincodeCodec, Codec}; // use futures::{Stream, StreamExt}; // use std::marker::PhantomData; // #[derive(Debug, PartialEq, Serialize, Deserialize)] // pub struct HelloRequest { // pub name: String, // pub id: i32, // pub nicknames: Vec, // pub is_active: Option, // pub attributes: std::collections::HashMap, // pub nested: NestedMessage, // pub aa: i8, // } // #[derive(Debug, PartialEq, Serialize, Deserialize)] // pub struct HelloReply { // pub message: String, // pub responses: Vec, // } // #[derive(Debug, PartialEq, Serialize, Deserialize)] // pub struct NestedMessage { // pub nested_field: String, // } // // pub struct HelloService<'a, C: Client, T: Codec> { // client: &'a C, // codec: PhantomData, // } // // impl<'a, C: Client, T: Codec> HelloService<'a, C, T> { // pub fn new(client: &'a C) -> Self { // Self { client, codec: PhantomData } // } // // pub async fn say_hello( // &self, // request: HelloRequest, // ) -> Result> { // let (mut send, recv) = self.client.open_transport().await; // send.send_metadata(RequestContext::new(0u32, 0u32, 0)).await?; // let mut send_unary = send.into_unary(); // send_unary.send(request).await?; // let mut recv_unary = recv.into_unary(); // let response: HelloReply = recv_unary.recv().await?; // return Ok(response); // } // pub async fn stream_server( // &self, // request: HelloRequest, // ) -> Result< // StreamReceiver< // ::StreamTarget, // HelloReply, // >, // Box, // > { // let (mut send, recv) = self.client.open_transport().await; // send.send_metadata(RequestContext::new(0u32, 1u32, 0)).await?; // let mut send_unary = send.into_unary(); // send_unary.send(request).await?; // let recv_stream = recv.into_stream(); // let receiver = StreamReceiver::< // ::StreamTarget, // HelloReply, // >::new(recv_stream); // Ok(receiver) // } // // pub async fn stream_client( // &self, // mut client_stream: impl Stream + Send + Unpin, // ) -> Result> { // let (mut send, recv) = self.client.open_transport().await; // send.send_metadata(RequestContext::new(0u32, 2u32, 0)).await?; // let send_stream = send.into_stream(); // let mut recv_unary_transport = recv.into_unary(); // let mut sender = StreamSender::< // ::StreamTarget, // HelloRequest, // >::new(send_stream); // while let Some(request) = client_stream.next().await { // sender.send(request).await?; // } // sender.finish().await?; // let response: HelloReply = recv_unary_transport.recv().await?; // Ok(response) // } // // pub async fn stream_both( // &self, // ) -> Result< // ( // StreamSender< // ::StreamTarget, // HelloRequest, // >, // StreamReceiver< // ::StreamTarget, // HelloReply, // >, // ), // Box, // > { // let (mut send, recv) = self.client.open_transport().await; // send.send_metadata(RequestContext::new(0u32, 3u32, 0)).await?; // let send_stream = send.into_stream(); // let recv_stream = recv.into_stream(); // let sender = StreamSender::< // ::StreamTarget, // HelloRequest, // >::new(send_stream); // let receiver = StreamReceiver::< // ::StreamTarget, // HelloReply, // >::new(recv_stream); // Ok((sender, receiver)) // } // } // // } mod hello{ rrpc_proc::proto_client!("hello.proto"); } pub async fn new_hello_service_client() -> hello::HelloService, BincodeCodec> { let codec = Arc::new(BincodeCodec::new(bincode::config::standard())); let client = QuicClient::connect(codec, "127.0.0.1:4433".parse().unwrap(), QuicConfig { client_key_pem: include_bytes!("../cert/client.key").to_vec(), client_cert_pem: include_bytes!("../cert/client.crt").to_vec(), root_cert_pem: include_bytes!("../cert/ca.crt").to_vec(), server_name: "127.0.0.1", }).await.unwrap(); let hello_client = hello::HelloService::new(&client); hello_client } ```

The generated Dart code is as follows:

// This file is automatically generated, so please do not edit it.
// Generated by `flutter_rust_bridge`@ 2.1.0.

// ignore_for_file: invalid_use_of_internal_member, unused_import, unnecessary_import

import '../../frb_generated.dart';
import 'package:flutter_rust_bridge/flutter_rust_bridge_for_generated.dart';

// These types are ignored because they are not used by any `pub` functions: `HelloReply`, `HelloRequest`, `NestedMessage`
// These function are ignored because they are on traits that is not defined in current crate (put an empty `#[frb]` on it to unignore): `eq`, `eq`, `eq`, `fmt`, `fmt`, `fmt`
// These functions are ignored (category: IgnoreBecauseOwnerTyShouldIgnore): `new`, `say_hello`, `stream_both`, `stream_client`, `stream_server`

// Rust type: RustOpaqueMoi<flutter_rust_bridge::for_generated::RustAutoOpaqueInner< HelloService < QuicClient < BincodeCodec > , BincodeCodec >>>
abstract class HelloServiceQuicClientBincodeCodecBincodeCodec
    implements RustOpaqueInterface {}

I want to export these functions say_hello, stream_both, stream_client, stream_server. How can I do this?

I noticed that using #[frb(external)] directly will export some irrelevant functions and properties.

For example, for this struct, I only want it to have a function recv, and I don't need the attributes recv_transport _marker and the function new.

pub struct StreamReceiver<R: RecvStreamTransport, T: TypeDecode> {
    recv_transport: R,
    _marker: PhantomData<T>,
}

impl<R: RecvStreamTransport, T: TypeDecode> StreamReceiver<R, T> {
    pub fn new(recv_transport: R) -> Self {
        Self {
            recv_transport,
            _marker: PhantomData,
        }
    }

    pub async fn recv(&mut self) -> Result<Option<T>, TransportError> {
        match self.recv_transport.recv().await {
            Ok(data) => Ok(Some(data)),
            Err(TransportError::EOF) => Ok(None),
            Err(e) => Err(e),
        }
    }
}
fzyzcjy commented 1 week ago

Hmm, given the message, the first guess is that, HelloService has complex generics, but generics is not yet supported by frb (feel free to PR!).