CasualX / vigem-client

ViGEm client API in pure Rust.
MIT License
17 stars 7 forks source link

Problem accessing xbox 360 notifications #7

Closed wanjohiryan closed 1 year ago

wanjohiryan commented 1 year ago

Hi,

I have trouble getting notifications through

   let thread = target.request_notification().unwrap().spawn_thread(move |_, data| {
        println!("{:#?}", data);
   });

i get the error

no method named `request_notification` found for struct `Xbox360Wired` in the current scope
method not found in `Xbox360Wired<Client>`

i have tried adding the feature unstable_xtarget_notification to my Cargo.toml to no avail.

Thanks in advance

wanjohiryan commented 1 year ago

I have finally made it work.

For future reference; add this to your Cargo.toml

vigem-client ={ git = "https://github.com/CasualX/vigem-client", features = ["unstable_xtarget_notification"] }

Thank you for this awesome crate...

CasualX commented 1 year ago

Hi, thanks for the kind words!

You should be able to use the version from crates.io just fine instead of relying on git master:

vigem-client = { version = "0.1.4", features = ["unstable_xtarget_notification"] }

Keep in mind that the notification API is horribly unreliable. Dropped and duplicated events happen regularly... I believe this is an issue with a poorly designed vigem driver interface itself, nothing I can fix. There's some workarounds flying around (using multiple threads to spam the request notification API) but these aren't implemented. So yeah the notification system is kinda bad, sorry about that.

wanjohiryan commented 1 year ago

So far, I haven't encountered any problems with the notifications feature. Thank you for creating this library. It's really nice that it can be used safely between threads.(an underrated feature)

By the way, would you be willing to help me out with the FIXMEs? I'm new to Rust, but I've managed to write this code. I would greatly appreciate your assistance. Thank you in advance! :)

use futures::SinkExt;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use serde_json::{json,to_value};
use std::sync::Arc;
use std::{env, io};
use tokio::{
    net::{TcpListener, TcpStream},
    task,
};
use tokio_tungstenite::accept_async;
use tokio_tungstenite::tungstenite::protocol::Message;
#[derive(Debug, Deserialize, Serialize)]
struct XGamepadJson {
    buttons: Vec<u8>,
    left_trigger: u8,
    right_trigger: u8,
    thumb_lx: i16,
    thumb_ly: i16,
    thumb_rx: i16,
    thumb_ry: i16,
}

#[tokio::main]
async fn main() -> Result<(), io::Error> {
    // Create a TcpListener and bind it to an address
    let addr = env::args()
        .nth(1)
        .unwrap_or_else(|| "127.0.0.1:9002".to_string());
    let listener = TcpListener::bind(&addr).await.expect("Can't listen");
    println!("Listening on: {}", addr);

    // Connect to the ViGEmBus driver
    let client = Arc::new(vigem_client::Client::connect().unwrap());

    // Create a semaphore with 4 permits
    // only 4 gamepads can be created at a time
    let semaphore = Arc::new(tokio::sync::Semaphore::new(4));

    // Loop over incoming connections
    while let Ok((stream, _)) = listener.accept().await {
        // Acquire a permit from the semaphore
        let _permit = semaphore.clone().acquire_owned().await.unwrap();

        //TODO: send an error to client

        let vigem = Arc::clone(&client);

        let semaphore_clone = semaphore.clone();
        task::spawn(async move {
            handle_connection(stream, vigem).await;

            semaphore_clone.add_permits(1);
        });
    }

    Ok(())
}

// Handle each connection in a separate task
async fn handle_connection(stream: TcpStream, client: Arc<vigem_client::Client>) {
    // Upgrade the TCP stream to a WebSocket stream
    let ws_stream = accept_async(stream).await.expect("Failed to accept");

    // Split the stream into sink and stream parts
    let (mut ws_sink, mut ws_stream) = ws_stream.split();

    // Create the virtual controller target
    let id = vigem_client::TargetId::XBOX360_WIRED;
    let mut target = vigem_client::Xbox360Wired::new(client, id);

    // Plugin the virtual controller
    target.plugin().unwrap();

    // Wait for the virtual controller to be ready to accept updates
    target.wait_ready().unwrap();

    let x = Arc::new(ws_sink);

    let ws_sink_clone = Arc::clone(&x);
    // Create a thread to handle notifications
    let thread = target.request_notification().unwrap().spawn_thread(move |_, data| {
        //FIXME: how can i make this work? pass the notifications back to client

        // let msg = Message::Text(json!({
        //     "large_motor": data.large_motor.to_string(),
        //     "small_motor": data.small_motor.to_string(),
        // }));
        // ws_sink.send(msg).await.unwrap();
    });

    // Respond with "OK" to show server is running
    ws_sink.send(Message::Text("OK".to_string())).await.unwrap();

    // Iterate over incoming messages
    while let Some(msg) = ws_stream.next().await {
        match msg {
            // If it is an empty message [Health check]
            Ok(Message::Text(data)) if data.is_empty() => {
                // Return "OK"
                ws_sink.send(Message::Text("OK".to_string())).await.unwrap();
            }
            // If it is a non-empty text message
            Ok(Message::Text(data)) => {
                // Try to parse it as json
                if let Ok(gpad_input) = serde_json::from_slice::<XGamepadJson>(&data.as_bytes()) {
                    // Use the extracted fields to construct a new vigem_client::XGamepad instance
                    let gamepad = vigem_client::XGamepad {
                        buttons: vigem_client::XButtons {
                            //FIXME: make this read numbers 11 and 12...
                            raw: gpad_input
                                .buttons
                                .iter()
                                .fold(0u16, |acc, &b| acc | (1 << (b - 1))),
                            // .fold(0u16, |acc, &b| acc | (1 << (b.saturating_sub(1))))
                        },
                        left_trigger: gpad_input.left_trigger,
                        right_trigger: gpad_input.right_trigger,
                        thumb_lx: gpad_input.thumb_lx,
                        thumb_ly: gpad_input.thumb_ly,
                        thumb_rx: gpad_input.thumb_rx,
                        thumb_ry: gpad_input.thumb_ry,
                        ..Default::default()
                    };

                    let _ = target.update(&gamepad);
                } else {
                    // Invalid json format
                    eprintln!("Invalid json: {}", data);
                }
            }
            // If it is a ping message
            Ok(Message::Ping(data)) => {
                // Reply with a pong message with same data
                ws_sink.send(Message::Pong(data)).await.unwrap();
            }
            // If it is a close message
            Ok(Message::Close(reason)) => {
                // Respond with a close message with same reason if any
                ws_sink.send(Message::Close(reason)).await.unwrap();
                break;
            }
            _ => {}
        }
    }

    // Dropping the target causes the notification request to abort and the thread to return
    drop(target);
    thread.join().unwrap();
}
CasualX commented 1 year ago

The API is not designed for async rust, sorry. I'm not too familiar with how to do things async in Rust.

If possible you already move ws_sink behind an Arc in x (you could just shadow it as wssink). You may need to wrap it as a `Arc<Mutex<>>depending on whether.sendon it requires&mut ` then you can use the cloned Arc in the notification thread.

But that's kind of messy and unfun. Alternatively you could try a dedicated channel to (I'm sure whatever async runtime you're using has something for this) and communicate notifications back to your main handler which in return will push the notification to the client. This avoids annoying Arcs and other shared ownership. However I don't have enough familiarity with Rust async to help you more, sorry.

Also I would just keep the XButtons as a u16 in your XGamepadJson wrapper, no need for Vec<u8>. Then the conversion is trivially simple just like your other code. You can clean it up with some From/Into implementations to keep everything tight.

wanjohiryan commented 1 year ago

Thanks for your help @CasualX

For future reference, the final script looks like:

use futures::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::sync::Arc;
use std::{env, io};
use tokio::{
    net::{TcpListener, TcpStream},
    sync::Mutex,
    task,
};
use tokio_tungstenite::accept_async;
use tokio_tungstenite::tungstenite::protocol::Message;
use vigem_client::XNotification;
#[derive(Debug, Deserialize, Serialize)]
struct XGamepadJson {
    buttons: Vec<String>,
    left_trigger: u8,
    right_trigger: u8,
    thumb_lx: i16,
    thumb_ly: i16,
    thumb_rx: i16,
    thumb_ry: i16,
}

#[tokio::main]
async fn main() -> Result<(), io::Error> {
    // Create a TcpListener and bind it to an address
    let addr = env::args()
        .nth(1)
        .unwrap_or_else(|| "127.0.0.1:9002".to_string());
    let listener = TcpListener::bind(&addr).await.expect("Can't listen");
    println!("Listening on: {}", addr);

    // Connect to the ViGEmBus driver
    let client = Arc::new(vigem_client::Client::connect().unwrap());

    // Create a semaphore with 4 permits
    // only 4 gamepads can be created at a time
    let semaphore = Arc::new(tokio::sync::Semaphore::new(4));

    // Loop over incoming connections
    while let Ok((stream, _)) = listener.accept().await {
        // Acquire a permit from the semaphore
        let _permit = semaphore.clone().acquire_owned().await.unwrap();

        //TODO: send an error to client

        let vigem = Arc::clone(&client);

        let semaphore_clone = semaphore.clone();
        task::spawn(async move {
            handle_connection(stream, vigem).await;

            semaphore_clone.add_permits(1);
        });
    }

    Ok(())
}

// Handle each connection in a separate task
async fn handle_connection(stream: TcpStream, client: Arc<vigem_client::Client>) {
    // Upgrade the TCP stream to a WebSocket stream
    let ws_stream = accept_async(stream).await.expect("Failed to accept");

    // Split the stream into sink and stream parts
    let (ws_sink, mut ws_stream) = ws_stream.split();

    // Create the virtual controller target
    let id = vigem_client::TargetId::XBOX360_WIRED;
    let mut target = vigem_client::Xbox360Wired::new(client, id);

    // Plugin the virtual controller
    target.plugin().unwrap();

    // Wait for the virtual controller to be ready to accept updates
    target.wait_ready().unwrap();

    // Create a channel for broadcasting notifications to the WebSocket client
    let (notification_tx, mut notification_rx) = tokio::sync::broadcast::channel(16);

    let ws_sender = Arc::new(Mutex::new(ws_sink));

    // Spawn a task to handle sending notifications to the WebSocket client
    let ws_sink_clone = ws_sender.clone();

    // Create a thread to handle notifications
    let thread = target
        .request_notification()
        .unwrap()
        .spawn_thread(move |_, data| {
            let _ = notification_tx.send(XNotification {
                large_motor: data.large_motor,
                small_motor: data.small_motor,
                led_number: data.led_number,
            });
        });

    let _notification_task = tokio::task::spawn(async move {
        while let Ok(notification) = notification_rx.recv().await {
            // let notification:XNotification = notification_rx.try_recv().unwrap();

            let msg = json!({
                "large_motor": notification.large_motor.to_string(),
                "small_motor": notification.small_motor.to_string(),
            });

            let mut ws_sink = ws_sink_clone.lock().await;
            ws_sink.send(Message::Text(msg.to_string())).await.unwrap();
        }
    });

    // Respond with "OK" to show server is running
    ws_sender
        .lock()
        .await
        .send(Message::Text("OK".to_string()))
        .await
        .unwrap();

    // Iterate over incoming messages
    while let Some(msg) = ws_stream.next().await {
        match msg {
            // If it is an empty message [Health check]
            Ok(Message::Text(data)) if data.is_empty() => {
                // Return "OK"
                ws_sender
                    .lock()
                    .await
                    .send(Message::Text("OK".to_string()))
                    .await
                    .unwrap();
            }
            // If it is a non-empty text message
            Ok(Message::Text(data)) => {
                // Try to parse it as json
                if let Ok(gpad_input) = serde_json::from_slice::<XGamepadJson>(&data.as_bytes()) {
                    // Use the extracted fields to construct a new vigem_client::XGamepad instance
                    // let gamepad = handle_input(gpad_input)?;
                    // let _ = target.update(&gamepad);
                    match handle_input(gpad_input) {
                        Ok(gamepad) => {
                            let _ = target.update(&gamepad);
                        }
                        Err(_) => {
                            // handle the error case here
                        }
                    }
                } else {
                    // Invalid json format
                    eprintln!("Invalid json: {}", data);
                }
            }
            // If it is a ping message
            Ok(Message::Ping(data)) => {
                // Reply with a pong message with same data
                ws_sender
                    .lock()
                    .await
                    .send(Message::Pong(data))
                    .await
                    .unwrap();
            }
            // If it is a close message
            Ok(Message::Close(reason)) => {
                // Respond with a close message with same reason if any
                ws_sender
                    .lock()
                    .await
                    .send(Message::Close(reason))
                    .await
                    .unwrap();
                break;
            }
            _ => {}
        }
    }

    // Dropping the target causes the notification request to abort and the thread to return
    drop(target);
    thread.join().unwrap();
}

//handle input and map into necessary values
fn handle_input(gpad_input: XGamepadJson) -> Result<vigem_client::XGamepad, ()> {
    let button_states = gpad_input.buttons.iter().fold(0u16, |acc, b| {
        let button_state = match b.to_string().as_str() {
            "A" => vigem_client::XButtons::A,
            "B" => vigem_client::XButtons::B,
            "X" => vigem_client::XButtons::X,
            "Y" => vigem_client::XButtons::Y,
            "UP" => vigem_client::XButtons::UP,       //DPAD_UP
            "DOWN" => vigem_client::XButtons::DOWN,   //DPAD_DOWN
            "LEFT" => vigem_client::XButtons::LEFT,   //DPAD_LEFT
            "RIGHT" => vigem_client::XButtons::RIGHT, //DPAD_RIGHT
            "LB" => vigem_client::XButtons::LB,       //left shoulder
            "RB" => vigem_client::XButtons::RB,       //right shoulder
            "BACK" => vigem_client::XButtons::BACK,
            "START" => vigem_client::XButtons::START,
            "LS" => vigem_client::XButtons::LTHUMB, //LEFT_THUMB,
            "RS" => vigem_client::XButtons::RTHUMB, //RIGHT_THUMB,
            _ => 0,
        };
        acc | button_state as u16
    });

    let gamepad = vigem_client::XGamepad {
        buttons: vigem_client::XButtons { raw: button_states },
        left_trigger: gpad_input.left_trigger,
        right_trigger: gpad_input.right_trigger,
        thumb_lx: gpad_input.thumb_lx,
        thumb_ly: gpad_input.thumb_ly,
        thumb_rx: gpad_input.thumb_rx,
        thumb_ry: gpad_input.thumb_ry,
        ..Default::default()
    };

    Ok(gamepad)
}

It's not perfect but it works

Do checkout qwantify Self hosted 'cloud gaming for friends'

Thank you for this awesome crate :)