locka99 / opcua

A client and server implementation of the OPC UA specification written in Rust
Mozilla Public License 2.0
496 stars 131 forks source link

[Help needed]: How to borrow variable of Server type with threads to work with infinite loops? #120

Closed jigar88 closed 3 years ago

jigar88 commented 3 years ago

Hello I am trying to create lot of nodes from the messages receiving from tcp server. I tried to follow demo server code but I have an issue for borrowing the server variable. I am newbie to rust as well. Can someone help to resolve the issue. I have continuous loop in the one of the rust file so opc server never gets start. This is more related to rust but due to opc code involvement I just want to get help from here if someone can.

Module: simulator

main.rs

mod client;

fn main() {
    unix_datagram_client::tcp_datagram_client()
}

client.rs

use std::os::unix::net::UnixDatagram;
use std::{thread, time};
use serde_json::json;

pub fn tcp_datagram_client() {
    pub static FILE_PATH: &'static str = "/tmp/socket.sock";
    let socket = UnixDatagram::unbound().unwrap();
    match socket.connect(FILE_PATH) {
        Ok(socket) => socket,
        Err(e) => {
            println!("Couldn't connect: {:?}", e);
            return;
        }
    };
    println!("TCP client Connected to TCP Server {:?}", socket);
    loop {
        let message_payload = json!({
            "name": "NJ/Mercer/Pennington/street1",
            "value":"Isa",
            "status": "close",
        });
        socket.send(message_payload.to_string().as_bytes()).expect("recv function failed");
        thread::sleep(time::Duration::from_secs(1));
    }
}

Module: src

main.rs

mod server;
mod process_tcp_messages;
mod opc_server;
mod global_nodes;

fn main() {
   opc_server::run_opc_server();
}

opc_server.rs

in this code is there a better way I can use server variable, if I try to create a thread it gives error of can't borrow moved variable.

use opcua_server::prelude::*;
use std::path::PathBuf;
use std::thread;
use std::borrow::BorrowMut;

use crate::global_opc_nodes;
use crate::process_tcp_messages;

pub fn run_opc_server() {
    let mut server = Server::new(ServerConfig::load(&PathBuf::from("/tmp/server.conf")).unwrap());

    let ns = {
        let address_space = server.address_space();
        let mut address_space = address_space.write().unwrap();
        address_space.register_namespace("OPCUA-SERVER").unwrap()
    };

    global_nodes::create_global_variables(&mut server, ns);
    process_incoming_tcp_messages::run(&mut server, ns);

    server.run();
    println!("OPC Server stared and running...");

}

global_nodes.rs

use opcua_server::prelude::*;
use std::path::PathBuf;
use std::thread;

static ROOT_NODE_PATH: &'static str = "OPC UA SAMPLE SERVER";

pub fn create_global_variables(server: &mut Server, ns: u16) {
    let address_space = server.address_space();
    let mut address_space = address_space.write().unwrap();

    let metrics_folder = address_space.add_folder("Folder",
                                                  "Folder",
                                                  &NodeId::objects_folder_id()).unwrap();
    let test_node = NodeId::new(ns, UAString::from("metrics"));
    let _ = address_space.add_variables(
        vec![
            Variable::new(&test_node, "metrics", "metrics", 0 as i32),
        ], &metrics_folder,
    );

}

process_tcp_messages.rs

I have a infinite loop here to process tcp server messages received from client. I try to put this code in thread but did not work , may be I am not doing proper threading. Based on the received message, I will create nodes and update its values continuously. Currently that function is comment out.

use std::thread;
use std::sync::mpsc;
use serde_json::{Value, from_str};
use opcua_server::server::Server;
use std::ops::DerefMut;
use std::borrow::BorrowMut;

use crate::unix_datagram_server;

static ROOT_NODE_PATH: &'static str = "OPC UA SAMPLE SERVER";

pub fn run(server: &mut Server, ns: u16) {
    let (tx_to_opc_server, rx_from_opc_server) = mpsc::channel();
    thread::spawn(|| unix_datagram_server::tcp_datagram_server(tx_to_opc_server));
    println!("TCP server started...");
    loop {
        let message_from_tcp_server = rx_from_opc_server.recv().unwrap();
        let closing_bracket_index = message_from_tcp_server.
            as_bytes().
            iter().
            position(|&x| x == b'}').
            map(|i| i + 1).
            unwrap_or_else(|| message_from_tcp_server.len());
        let message_json: Value = serde_json::from_str(&message_from_tcp_server[..closing_bracket_index]).unwrap();
        let mut path = message_json["name"].clone().to_string();
        path = path.replace("\"", "");
        let tag_to_update = ROOT_NODE_PATH.to_owned() + &*path;
        let split_path: Vec<&str> = path.as_str().split("/").collect(); // this gives iterator, convert this to vector using collect()
        let tag_status = message_json["status"].clone();
        let tag_value = message_json["value"].clone();
        println!("{:?}, {:?}, {}, {}", tag_to_update, split_path, tag_status, tag_value);
        // create_opc_nodes::create_nodes_recursively(server.borrow_mut(), split_path, tag_value, ns);
    };
}
schroeder- commented 3 years ago

Use _Server::runserver and Arc<RwLock> quick example (not complete)

    let server = Arc::new(RwLock::new(new_server(0)));
    let server2 = server.clone();
    let (tx, rx) = channel();
    let _t = thread::spawn(move || {
        // This should run & block until it is told to abort
        Server::run_server(server);
        tx.send(()).unwrap();
    });

    {
        let server = server2.write().unwrap();
        ...
    }
jigar88 commented 3 years ago

@schroeder- I am not quite getting it, but I will keep tying to troubleshoot.

schroeder- commented 3 years ago

If you need to store a variable multiple times with different lifetimes you normally store the variable as refcounted variable. If you are single threaded you use Rc, for use in multiple threads you use Arc. This way you can work around your the lifetime issue. To secure share a variable you use a RwLock or Mutex. In this context you should use Arc<RwLock>, because the server needs this to use with Server::run_server.

locka99 commented 3 years ago

When you call server.run() it basically blocks the current thread so nothing else will happen until it exits. If you want to do other stuff then spawn a thread and run the server from that and carry on in the other thread.

It's hard to say what you want to do from the rest of the code, but you there are two ways of setting values on variables in the server - either you can just set the value and let the server manage it or you can register a getter which is called to retrieve the value that you manage.

The simple_server demonstrates both of these with variables hooked up to be set from a timer or called when the value is needed. It really depends what you're doing which is the best approach but I provide both as ways to do it.

jigar88 commented 3 years ago

Okay I am trying to use Arc with Mutex to reuse server variable multiple time.

pub fn run_opc_server() {
    let mut server = Server::new(ServerConfig::load(&PathBuf::from("/tmp/server.conf")).unwrap());
    let server2 = Arc::new(Mutex::new(server));
    let server2 = Arc::clone(&server2);
    let ns = {
        let address_space = server2.lock().unwrap().address_space();
        let mut address_space = address_space.write().unwrap();
        address_space.register_namespace("EMS").unwrap()
    };
    global_opc_nodes::create_global_variables(&mut server2.lock().unwrap(), ns);
    server2.lock().unwrap().run();

This gives an error

error[E0507]: cannot move out of dereference of `std::sync::MutexGuard<'_, opcua_server::server::Server>`
  --> src/opc_server.rs:22:5
   |
22 |     server2.lock().unwrap().run();
   |     ^^^^^^^^^^^^^^^^^^^^^^^ move occurs because value has type `opcua_server::server::Server`, which does not implement the `Copy` trait

and same result with RwLock

let server = Server::new(ServerConfig::load(&PathBuf::from("/tmp/server.conf")).unwrap());
    let server2 = Arc::new(RwLock::new(server));
    let server2 = Arc::clone(&server2);
    let ns = {
        let address_space = server2.write().unwrap().address_space();
        let mut address_space = address_space.write().unwrap();
        address_space.register_namespace("EMS").unwrap()
    };
    global_opc_nodes::create_global_variables(&mut server2.write().unwrap(), ns);
    server2.write().unwrap().run();
schroeder- commented 3 years ago

You have to call Server::run_server, if you want to use an Arc<RwLock>.

let server2 = Arc::new(RwLock::new(.........));
Server::run_server(server2);
jigar88 commented 3 years ago

Ok , I have reached halfway. Now opc server starts but then I want to use same server variable for tcp server loop. But now it gives error that variable moved to closure so you can't borrow.

pub fn run_opc_server() {
    let server = Server::new(ServerConfig::load(&PathBuf::from("/tmp/server.conf")).unwrap());
    let server2 = Arc::new(RwLock::new(server));
    let server2 = Arc::clone(&server2);
    let ns = {
        let address_space = server2.write().unwrap().address_space();
        let mut address_space = address_space.write().unwrap();
        address_space.register_namespace("EMS").unwrap()
    };
    global_opc_nodes::create_global_variables(&mut server2.write().unwrap(), ns);
    thread::spawn(move || Server::run_server(server2));
    println!("OPC Server stared and running...");
    process_incoming_tcp_messages::run(&mut server2.write().unwrap(), ns);
}

gives error:
error[E0382]: borrow of moved value: server2 --> src/opc_server.rs:21:46 12 let server2 = Arc::clone(&server2); ------- move occurs because server2 has type std::sync::Arc<std::sync::RwLock<opcua_server::server::Server>>, which does not implement the Copy trait ... 19 thread::spawn(move Server::run_server(server2)); ------- ------- variable moved due to use in closure
value moved into closure here
20 println!("OPC Server stared and running...");
21 process_incoming_tcp_messages::run(&mut server2.write().unwrap(), ns);
^^^^^^^ value borrowed here after move
schroeder- commented 3 years ago

Maybe you should read about how to use shared state in rust: rwlock rust book

For your problem you can clone an Arc before spawn a thread.

pub fn run_opc_server() {
    let server = Server::new(ServerConfig::load(&PathBuf::from("/tmp/server.conf")).unwrap());
    let server2 = Arc::new(RwLock::new(server));
    let ns = {
        let address_space = server2.write().unwrap().address_space();
        let mut address_space = address_space.write().unwrap();
        address_space.register_namespace("EMS").unwrap()
    };
    global_opc_nodes::create_global_variables(&mut server2.write().unwrap(), ns);
   let server_th = server2.clone();
    thread::spawn(move || Server::run_server(server_th ));
    println!("OPC Server stared and running...");
    process_incoming_tcp_messages::run(&mut server2.write().unwrap(), ns);
}
jigar88 commented 3 years ago

so I have both servers working now, its just sometimes not connecting in time. opc server takes little longer to connect but I am able to process messages on opc server from tcp server. I ended up doing

pub fn run_opc_server() {
    let opc_server = Server::new(ServerConfig::load(&PathBuf::from("/tmp/server.conf")).unwrap());
    let opc_server_arc_object = Arc::new(RwLock::new(opc_server));
    let ns = {
        let address_space = opc_server_arc_object.write().unwrap().address_space();
        let mut address_space = address_space.write().unwrap();
        address_space.register_namespace("EMS").unwrap()
    };
    global_opc_nodes::create_global_variables(&mut opc_server_arc_object.write().unwrap(), ns);
    let threaded_server_object = opc_server_arc_object.clone();
    thread::spawn(move || Server::run_server(threaded_server_object));
    // let sts = NodeId::new(0, 2256);
    // let v = DataValue::new_now(sts.clone());
    loop {
        thread::sleep(time::Duration::from_secs(30));
        let threaded_server_for_nodes = opc_server_arc_object.clone();
        thread::spawn(move || process_incoming_tcp_messages::run(&mut threaded_server_for_nodes.write().unwrap(), ns));
    }
}
locka99 commented 3 years ago

Closing because remaining issues are probably race conditions in the implementation using the crate