Diggsey / actix-interop

Allow access to actor contexts from within normal future combinators and async/await blocks
Apache License 2.0
14 stars 6 forks source link

the trait bound `MyWebSocket: actix::actor::Actor` is not satisfied #7

Open bergkvist opened 2 years ago

bergkvist commented 2 years ago

I'm getting some compile errors when trying to use a tokio::sync::watch channel with an Actix WebSocket - and send a message whenever there is a change event.

impl Actor for MyWebSocket {
    type Context = ws::WebsocketContext<Self>;
    fn started(&mut self, ctx: &mut Self::Context) {
        // ...
        let mut rx = self.state.receiver.clone(); // tokio::sync::watch::Receiver<String>
        async move {
            while rx.changed().await.is_ok() {
                with_ctx(|actor: &mut Self, ctx: &mut ws::WebsocketContext<Self>| {
                    ctx.text(*rx.borrow());
                });
                // the trait bound `MyWebSocket: actix::actor::Actor` is not satisfied
                // the trait `actix::actor::Actor` is not implemented for `MyWebSocket`
            }
        }
        .interop_actor_boxed(self);
        // the trait bound `MyWebSocket: actix::actor::Actor` is not satisfied
        // required because of the requirements on the impl of `FutureInterop<MyWebSocket>` for `impl std::future::Future<Output = [async output]>`
    }
}
Full code ```rs use std::time::{Duration, Instant}; use actix::prelude::*; use actix_web::{middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer}; use actix_web_actors::ws; use std::sync::Arc; use tokio::sync::watch; use actix_interop::{FutureInterop, with_ctx}; /// How often heartbeat pings are sent const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); /// How long before lack of client response causes a timeout const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); #[actix_web::main] async fn main() -> std::io::Result<()> { std::env::set_var("RUST_LOG", "actix_server=info,actix_web=info"); env_logger::init(); HttpServer::new(move || { let state = State::new(""); App::new() .app_data(web::Data::new(state)) .wrap(middleware::Logger::default()) .service(web::resource("/").route(web::get().to(ws_index))) }) .bind("127.0.0.1:8080")? .run() .await } /// do websocket handshake and start `MyWebSocket` actor async fn ws_index(req: HttpRequest, stream: web::Payload) -> Result { let evt = req.app_data::>().unwrap().clone(); ws::start(MyWebSocket::new(evt.into_inner().as_ref().clone()), &req, stream) } #[derive(Clone)] struct State { sender: Arc>, receiver: watch::Receiver } impl State { fn new(init: impl AsRef) -> Self { let (sender, receiver) = watch::channel(init.as_ref().to_string()); Self { sender: Arc::new(sender), receiver } } } struct MyWebSocket { heartbeat: Instant, state: State } impl MyWebSocket { fn new(state: State) -> Self { Self { heartbeat: Instant::now(), state, } } fn set_value(&self, value: impl AsRef) { if let Err(err) = self.state.sender.send(value.as_ref().to_string()) { println!("Error {}", err); } } } impl Actor for MyWebSocket { type Context = ws::WebsocketContext; fn started(&mut self, ctx: &mut Self::Context) { ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { if Instant::now().duration_since(act.heartbeat) > CLIENT_TIMEOUT { println!("Websocket Client heartbeat failed, disconnecting!"); ctx.stop(); return; } ctx.ping(b""); }); let mut rx = self.state.receiver.clone(); async move { while rx.changed().await.is_ok() { with_ctx(|actor: &mut Self, ctx: &mut ws::WebsocketContext| { ctx.text(*rx.borrow()); }); } } .interop_actor_boxed(self); } } impl StreamHandler> for MyWebSocket { fn handle( &mut self, msg: Result, ctx: &mut Self::Context, ) { println!("WS: {:?}", msg); match msg { Ok(ws::Message::Ping(msg)) => { self.heartbeat = Instant::now(); ctx.pong(&msg); } Ok(ws::Message::Pong(_)) => { self.heartbeat = Instant::now(); } Ok(ws::Message::Text(text)) => self.set_value(text), Ok(ws::Message::Binary(bin)) => { let text = std::str::from_utf8(&bin).unwrap_or(""); self.set_value(text); }, Ok(ws::Message::Close(reason)) => { ctx.close(reason); ctx.stop(); } _ => ctx.stop(), } } } ```

Dependencies:

actix = "0.10"
actix-codec = "0.3"
actix-web = "3"
actix-web-actors = "3"
awc = "2"
env_logger = "0.8"
futures = "0.3.1"
tokio = { version = "1.16", features = [ "sync", "rt", "macros", "time" ] }
actix-interop = "0.3.0"
Diggsey commented 2 years ago

You need to use actix 0.12 with actix-interop 0.3, but it looks like you're using actix 0.10.