mlua-rs / mlua

High level Lua 5.4/5.3/5.2/5.1 (including LuaJIT) and Roblox Luau bindings to Rust with async/await support
Other
1.75k stars 139 forks source link

Unreusable UserData #455

Closed earlysun030201 closed 2 months ago

earlysun030201 commented 2 months ago

When I call a certain method using lua, all other methods fail and I can't seem to find out why...

use tauri_plugin_http::reqwest;
use tokio::net::TcpStream;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use futures_util::{StreamExt, SinkExt};
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message, WebSocketStream, MaybeTlsStream};
use mlua::{BString, ExternalError, Function as LuaFunction, Lua, RegistryKey, Table as LuaTable, UserData, UserDataMethods, Value as LuaValue};

...other codes

struct LuaWebSocketStream(WebSocketStream<MaybeTlsStream<TcpStream>>);
impl UserData for LuaWebSocketStream {
    fn add_methods<M: UserDataMethods<Self>>(methods: &mut M) {
        methods.add_async_method_mut("read", |lua, mut this, callback: LuaFunction| async move {
            let callback_key: RegistryKey = lua.create_registry_value(callback)?;
            tokio::spawn(async move {
                if let Ok(callback) = lua.registry_value::<LuaFunction>(&callback_key) {
                    let res = lua.create_table().ok().unwrap();
                    match this.0.next().await {
                        Some(Ok(Message::Text(text))) => {
                            res.set("packet", 0).ok();
                            res.set("msg", text).ok();
                            callback.call::<()>((false, res)).ok();
                        }
                        Some(Ok(Message::Binary(bin))) => {
                            res.set("packet", 1).ok();
                            res.set("msg", bin).ok();
                            callback.call::<()>((false, res)).ok();
                        }
                        Some(Ok(Message::Ping(ping))) => {
                            res.set("packet", 2).ok();
                            res.set("msg", ping).ok();
                            callback.call::<()>((false, res)).ok();
                        }
                        Some(Ok(Message::Pong(pong))) => {
                            res.set("packet", 3).ok();
                            res.set("msg", pong).ok();
                            callback.call::<()>((false, res)).ok();
                        }
                        Some(Ok(Message::Close(close_frame))) => {
                            let reason = match close_frame {
                                Some(frame) => frame.to_string(),
                                None => "Closed".to_string(),
                            };
                            res.set("packet", 4).ok();
                            res.set("msg", reason).ok();
                            callback.call::<()>((false, res)).ok();
                        }
                        Some(Ok(Message::Frame(f))) => {
                            res.set("packet", 5).ok();
                            res.set("msg", f.to_string()).ok();
                            callback.call::<()>((false, res)).ok();
                        }
                        Some(Err(e)) => {
                            callback.call::<()>(e.to_string()).ok();
                        }
                        None => {
                            callback.call::<()>("Connection closed").ok();
                        }
                    }
                    lua.remove_registry_value(callback_key).ok();
                }
            });

            Ok(())
        });

        methods.add_async_method_mut("write", |lua, mut this, (data, callback): (String, LuaFunction)| async move {
            let callback_key: RegistryKey = lua.create_registry_value(callback)?;
            tokio::spawn(async move {
                if let Ok(callback) = lua.registry_value::<LuaFunction>(&callback_key) {
                    let result = this.0.send(Message::Text(data)).await;
                    match result {
                        Ok(_) => {
                            callback.call::<()>(false).ok();
                        }
                        Err(e) => {
                            callback.call::<()>(e.to_string()).ok();
                        }
                    }
                    lua.remove_registry_value(callback_key).ok();
                }
            });
            Ok(())
        });

        methods.add_async_method_mut("close", |_, mut this, ()| async move {
            tokio::spawn(async move {
                this.0.close(None).await.ok();
            });
            Ok(())
        });
    }    
}
pub fn build_websocket(lua: &Lua) -> LuaFunction {
    let socket = lua.create_async_function(|lua, (url, callback, timeout): (String, LuaFunction, Option<u64>)| async move {
        let callback_key: RegistryKey = lua.create_registry_value(callback)?;
        tokio::spawn(async move {
            let timeout = match timeout {
                None => 30000,
                _ => timeout.unwrap()
            };

            let ws_conn = tokio::time::timeout(tokio::time::Duration::from_millis(timeout), connect_async(url)).await;
            if let Ok(callback) = lua.registry_value::<LuaFunction>(&callback_key) {

                match ws_conn {
                    Ok(Ok((stream, _))) => {
                        callback.call::<()>( (false, LuaWebSocketStream(stream))).ok();
                    },
                    Ok(Err(e)) => {
                        callback.call::<()>( e.to_string()).ok();
                    },
                    Err(e) => {
                        callback.call::<()>( e.to_string()).ok();
                    }
                };

                lua.remove_registry_value(callback_key).ok();
            }
            return;
        });

        Ok(())
    }).map_err(|e| e.into_lua_err()).unwrap();

    return socket;
}
websocket("ws://124.222.224.186:8800", function(err, res)
    print("chunk 1")
    if err then
        print(err)
        return
    end

    res:write("test msg1", function(err)
        if err then
            print(err)
            res:close()
            return
        end

        print("sended msg1")
        print("chunk 2")
        res:write("test msg2", function(err)
            if err then
                print(err)
                res:close()
                return
            end

            print("sended msg2")
        end)
    end)
    print("all requests is pendding")
end)

Expected output:

chunk 1
print("all requests is pendding")
sended msg1
chunk 2
sended msg2

The actual output:

chunk 1
all requests is pendding
sended msg1
chunk 2

I modified the code block as follows:

methods.add_async_method_mut("write", |lua, mut this, (data, callback): (String, LuaFunction)| async move {
    println!("write");
    let callback_key: RegistryKey = lua.create_registry_value(callback)?;
    tokio::spawn(async move {
        if let Ok(callback) = lua.registry_value::<LuaFunction>(&callback_key) {
            let result = this.0.send(Message::Text(data)).await;
            match result {
                Ok(_) => {
                    callback.call::<()>(false).ok();
                }
                Err(e) => {
                    callback.call::<()>(e.to_string()).ok();
                }
            }
            lua.remove_registry_value(callback_key).ok();
        }
    });
    Ok(())
});

Expected output:

write
write

The actual Expected:

write
earlysun030201 commented 2 months ago

~The actual Expected:~ The actual output:

write
earlysun030201 commented 2 months ago

With the add_method method and Arc, I solved the problem.