shekohex / allo-isolate

Run Multithreaded Rust along with Dart VM (in isolate) 🌀
Apache License 2.0
120 stars 18 forks source link

Streaming data #16

Closed nurmukhametdaniyar closed 3 years ago

nurmukhametdaniyar commented 3 years ago

I am trying to stream data from Rust to Dart. I am listening to CAN frames and when I receive one, I post it into Isolate, but the problem is that data is not streamed to Dart, instead it is sent in batch after the function is completed. Because of this kind of behavior, I am trying to use a workaround on Dart's side where I call the function over and over again after I receive data from CAN, but it leads to missing some CAN frames which are really important. Here is the code on the Rust side:

extern crate mio;
extern crate mio_serial;

use allo_isolate::Isolate;
use futures_util::stream::StreamExt;
use tokio_socketcan::{CANSocket, Error};

#[no_mangle]
pub extern "C" fn read_data(isolate_port: i64) {
    read_can(isolate_port);
}

#[tokio::main]
async fn read_can(isolate_port: i64) -> Result<(), Error> {
    let isolate = Isolate::new(isolate_port);

    let mut socket_rx = CANSocket::open("vcan0")?;

    while let Some(Ok(frame)) = socket_rx.next().await {
        let data = frame.data();
        /*
         Here I perform some data manipulation
       */
        isolate.post(format!("{:?}", can_frame_map));
        break; // I had to explicitly add break here since it was not posting any values until function finishes
    }
    Ok(())
}

And here is the data listener class:

class DataListener {
  static setup() {
    native.store_dart_post_cobject(NativeApi.postCObject);
    print("Port listener Setup Done");
  }

  Stream listenData() {
    print('Creating receive port');
    final receivePort = ReceivePort();
    print('Starting listen port');
    native.listen_port(
      receivePort.sendPort.nativePort,
    );
    print('Returning stream');
    return receivePort; //It doesn't matter if I return receivePort and listen to it later or start listening straight away in the function. The behaviour is still the same
  }
}

This is the function on the Dart's side which calls for the listenData function:

static void readData(SendPort sendPort) async {
    DataListener.setup();
    while (true) {
      Stream<dynamic> canStream = DataListener().listenData();
      sendPort.send(await canStream.first); //Sending to another isolate for processing
     // Even without waiting for the first element of the stream, the behavior is the same. It is pretty much just a silly workaround
    }
  }

So I basically want to create an infinite loop for streaming data from Rust to Dart. Or is it even possible to do with Allo isolate?

shekohex commented 3 years ago

Maybe this example helps you here on writing streams with allo-isolate https://github.com/shekohex/rustystream

nurmukhametdaniyar commented 3 years ago

Maybe this example helps you here on writing streams with allo-isolate https://github.com/shekohex/rustystream

I see how this is working and actually managed to stream data from Rust to Dart, but only when I knew that data is coming and I was just sending some arbitrary data. The thing is that I am trying to stream data from CAN protocol to Dart, but I just want to keep listening to CAN and send it whenever there is data, and when there is no data, just don't do anything.

It looks like this: 1) I open the CAN socket 2) I listen to CAN 3) Whenever there is data on CAN, I send it. If there is no data, I just wait until it comes

I want the loop to be infinite and end whenever the Dart program finishes. I tried doing infinite loops on Dart's side and calling function over and over again, but it didn't help, since there is a small-time to pass data from Rust to Dart and process it in Dart, so in that time some CAN frames are skipped

nurmukhametdaniyar commented 3 years ago

Nevermind, just spawning another thread worked out :D