HuFlungDu / pylibmeshctrl

Libmeshctrl implementation in python
MIT License
1 stars 1 forks source link

Added connecting method, working in tests. #6

Open DaanSelen opened 3 weeks ago

DaanSelen commented 3 weeks ago

Hello @HuFlungDu

Here is the class I made for connecting to MeshCentral. With basic functionality. I have also tested when it asks for input to input: {"action": "meshes", "responseid": "meshctrl"} it works.

I'd like to hear what you think, and test it out if you have time. Make sure te replace the variables at the top.

DaanSelen commented 2 weeks ago

@HuFlungDu

HuFlungDu commented 2 weeks ago

Apologies, I've been heads down the past week implementing the API using this connecting method. I needed to find a way to connect and send/receive commands in parallel. I'll probably push a working prototype to the development branch within the week; I'm just finishing up work on the tunnels needed to send commands and up/download files. I'll respond an this PR when I push it.

DaanSelen commented 2 weeks ago

Apologies, I've been heads down the past week implementing the API using this connecting method. I needed to find a way to connect and send/receive commands in parallel. I'll probably push a working prototype to the development branch within the week; I'm just finishing up work on the tunnels needed to send commands and up/download files. I'll respond an this PR when I push it.

I've gotten permission from my manager:

!!!! TEXT DUMP !!!!

import argparse
import asyncio
import websockets
import json
import os
import yaml
import re
import psycopg2
from configparser import ConfigParser
from base64 import b64encode

basic_ready_state   = asyncio.Event()
details_ready_state = asyncio.Event()
global_list  = []
details_list = []
ids          = []

class ScriptEndTrigger(Exception):
    pass

class meshcaller_utilities:
    @staticmethod
    def base64_encode(string): # Encode to Base64 encoding.
        return b64encode(string.encode('utf-8')).decode()

    @staticmethod
    def getInfoFromID(id, attr): # Get data from the Simple list, which is created on startup.
        for company in global_list:
            for machine in company["nodes"]:
                if machine["node_id"] == id:
                    match attr:
                        case "name":
                            company_name = next((company["mesh_name"] for company in global_list if id == company["mesh_id"]), None)
                            if company_name:
                                return company_name
                            device_name = next((device["node_name"] for company in global_list for device in company["nodes"] if id == device["node_id"]), None)
                            return device_name
                        case "company_name":
                            return company["mesh_name"]
                        case "company_desc":
                            return company["mesh_desc"]
                        case "node_tags":
                            return machine["node_tags"]
                        case "node_os":
                            return [part for part in re.split(r'(\d.*)', machine["node_os"], maxsplit=1) if part] # Split for Debian and Mac systems.
                        case "node_users":
                            return machine["node_users"]
                        case "remote_address":
                            return machine["remote_address"]

    @staticmethod
    def getTargetIDs(company=None, device=None): # Get the node_id from the nodes which fit either the company or device argument.

        for entry in global_list:
            nodes = entry.get('nodes', [])  # Assuming the list of nodes is under the 'nodes' key

            if company and not device:  # Filter by company only
                if entry.get('mesh_name') == company:  # Check if the company matches
                    ids.extend(node['node_id'] for node in nodes)

            elif device and not company:  # Filter by device only
                for node in nodes:
                    if node['node_name'] == device:  # Check if the device matches
                        ids.append(node['node_id'])
                        return ids  # Return immediately when the device is found

            elif not company and not device:  # No filtering, return all IDs
                ids.extend(node['node_id'] for node in nodes)
        return ids

    @staticmethod
    async def sendDetailsRequest(python_client, ids):
        for id in ids:
            await python_client.ws_send_data(json.dumps({'action': 'getnetworkinfo', 'nodeid': id, 'responseid': 'meshctrl'}))
            await python_client.ws_send_data(json.dumps({'action': 'lastconnect', 'nodeid': id, 'responseid': 'meshctrl'}))
            await python_client.ws_send_data(json.dumps({'action': 'getsysinfo', 'nodeid': id, 'nodeinfo': True, 'responseid': 'meshctrl'}))

class meshcaller_configreader:
    @staticmethod
    def load_config(conffile=None, segment='meshcentral-service'):
        if conffile is None:
            conffile = './api.conf'

        if not os.path.exists(conffile):
            raise ScriptEndTrigger(f'Missing config file {conffile}, unable to initialize. Please provide an alternative path.')

        try:
            my_config = ConfigParser()
            my_config.read(conffile)
        except Exception as err:
            raise ScriptEndTrigger(f'Unable to read config file <{conffile}>: {err}!')

        if segment not in my_config:
            raise ScriptEndTrigger(f'Segment "{segment}" not found in config file <{conffile}>!')

        return my_config[segment]

# Main class for handling WebSocket communication
class meshcaller_websocket:
    meshsocket = None

    def __init__(self):
        self.received_response_queue = asyncio.Queue()

    @staticmethod
    async def ws_on_open():
        if not args.silent:
            print('Connection established.')

    @staticmethod
    async def ws_on_close():
        print('Connection closed by remote host.')
        raise ScriptEndTrigger("WebSocket connection closed.")

    async def ws_on_message(self, message):
        try:
            received_response = json.loads(message)
            await self.received_response_queue.put(received_response)
        except json.JSONDecodeError:
            print("Errored on:", message)
            raise ScriptEndTrigger("Failed to decode JSON message")

    async def ws_send_data(self, message):
        if self.meshsocket is not None:
            if not args.silent:
                print('Sending data to the target server.')
            await self.meshsocket.send(message)
        else:
             raise ScriptEndTrigger("WebSocket connection not established. Unable to send data.")

    async def genSimpleList(self):
        await self.ws_send_data(json.dumps({'action': 'meshes', 'responseid': 'meshctrl'}))
        await self.ws_send_data(json.dumps({'action': 'nodes', 'responseid': 'meshctrl'}))

    async def ws_handler(self, uri, username, password):
        login_string = f'{meshcaller_utilities.base64_encode(username)},{meshcaller_utilities.base64_encode(password)}'
        ws_headers = {
            'User-Agent': 'MeshCentral API client',
            'x-meshauth': login_string
        }
        if not args.silent:
            print("Trying to connect")

        try:
            async with websockets.connect(uri, extra_headers=ws_headers) as meshsocket:
                self.meshsocket = meshsocket
                await self.ws_on_open()  # Call ws_on_open when connection is established

                await self.genSimpleList()

                while True:
                    try:
                        message = await meshsocket.recv()  # Receive message
                        await self.ws_on_message(message)  # Process the message
                    except websockets.ConnectionClosed:
                        await self.ws_on_close()  # Call ws_on_close on disconnection
                        break

        except ScriptEndTrigger as e:
            print(f"WebSocket handler ended: {e}")
        except Exception as e:
            print(f"An error occurred: {e}")

class meshcaller_datapusher:
    def __init__(self):
        self.database_creds = meshcaller_configreader.load_config(args.conf, 'meshcentral-database')
        print("Loaded Database Credentials.")
        self.connect_to_database()

    def connect_to_database(self):
        self.dataConnection = psycopg2.connect(
            user=self.database_creds['username'],
            password=self.database_creds['password'],
            host=self.database_creds['database_host'],
            port=self.database_creds['database_port'],
            database=self.database_creds['database']
        )
        print("Connected to the database successfully")

    def pushData(self, data):
        cursor = self.dataConnection.cursor()
        try:
            # Insert mock data into devices table
            insert_device_query = """
            INSERT INTO devices (
                customer_id,
                endpoint_type,
                friendly_name,
                system_name,
                os_name,
                os_version,
                last_user,
                remote_address,
                manufacturer,
                device_model,
                device_serial
            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
            """
            for entry in data:
                cursor.execute(insert_device_query, (
                    entry['company_id'],
                    entry['node_type'],
                    entry['node_name'],
                    entry['node_name'],
                    entry['node_os'][0],
                    entry['node_os'][1],
                    entry['node_users'][0] if len(entry['node_users']) > 0 else "Unknown",
                    entry['remote_address'],
                    entry['device_manufacturer'],
                    entry['device_model'],
                    entry['device_serial']
                ))

            self.dataConnection.commit()
            print("Data inserted successfully.")

        except (Exception, psycopg2.Error) as error:
            print(f"Error while inserting data: {error}")

        finally:
            cursor.close()  # Always close the cursor

    def close_connection(self):
        if self.dataConnection:
            self.dataConnection.close()
            print("Database connection closed")

class meshcaller_processor:
    def __init__(self):
        self.basic_temp_list = []
        self.batch_temp_list = []
        self.batch_counter = 0
        self.basic_counter = 0

    def handleBasicData(self, data):
        if not args.silent:
            print("Received Basic-Data entry")

        self.basic_temp_list.append(data)
        self.basic_counter += 1

        if self.basic_counter == 2:
            temp_dict = {}
            for entry in self.basic_temp_list:
                if isinstance(entry, list):
                    for mesh in entry:
                        if mesh.get("type") == "mesh":
                            mesh_name = mesh.get("name", "Unknown Mesh")
                            mesh_id = mesh["_id"]
                            mesh_desc = mesh.get("desc", "No description")
                            # Initialize mesh entry with counters
                            temp_dict[mesh_id] = {
                                "mesh_name": mesh_name,
                                "mesh_desc": mesh_desc,
                                "nodes": [],
                                "total_linux_endpoints": 0,
                                "total_linux_thinclients": 0,
                                "total_linux_servers": 0,
                                "total_mac_endpoints": 0,
                                "total_windows_endpoints": 0,
                                "total_windows_servers": 0,
                            }

                elif isinstance(entry, dict):
                    for mesh_id, nodes in entry.items():
                        if mesh_id in temp_dict:
                            # Extend existing node list
                            temp_dict[mesh_id]["nodes"].extend(nodes)
                        else:
                            # Create a new mesh entry with tag counters
                            temp_dict[mesh_id] = {
                                "mesh_name": "Unknown Mesh",
                                "mesh_desc": "No description",
                                "nodes": nodes,
                                "total_linux_endpoints": 0,
                                "total_linux_thinclients": 0,
                                "total_linux_servers": 0,
                                "total_mac_endpoints": 0,
                                "total_windows_endpoints": 0,
                                "total_windows_servers": 0,
                            }

                        # Count tags for each node
                        for node in nodes:
                            # Assume there is only one tag
                            tag = node.get("tags", [None])[0]  # Get the first tag or None
                            if tag:
                                if tag == "Linux-Endpoint":
                                    temp_dict[mesh_id]["total_linux_endpoints"] += 1
                                elif tag == "Linux-ThinClient":
                                    temp_dict[mesh_id]["total_linux_thinclients"] += 1
                                elif tag == "Linux-Server":
                                    temp_dict[mesh_id]["total_linux_servers"] += 1
                                elif tag == "Mac-Endpoint":
                                    temp_dict[mesh_id]["total_mac_endpoints"] += 1
                                elif tag == "Windows-Endpoint":
                                    temp_dict[mesh_id]["total_windows_endpoints"] += 1
                                elif tag == "Windows-Server":
                                    temp_dict[mesh_id]["total_windows_servers"] += 1

            # Prepare final output
            for mesh_id, details in temp_dict.items():
                global_list.append({
                    "mesh_name": details["mesh_name"],
                    "mesh_id": mesh_id,
                    "mesh_desc": details["mesh_desc"],
                    "nodes": [
                        {  # Node Sub Structure
                            "node_id": node["_id"],
                            "node_name": node.get("name", "Unknown Node"),
                            "node_tags": node.get("tags", "Unknown Tags"),
                            "node_os": node.get("osdesc", "Unknown OS"),
                            "node_users": node.get("users", "Unkown Users"),
                            "remote_address": node.get("ip", "Uknown Remote IP")
                        }
                        for node in details["nodes"]
                    ],
                    "total_linux_endpoints": details["total_linux_endpoints"],
                    "total_linux_thinclients": details["total_linux_thinclients"],
                    "total_linux_servers": details["total_linux_servers"],
                    "total_mac_endpoints": details["total_mac_endpoints"],
                    "total_windows_endpoints": details["total_windows_endpoints"],
                    "total_windows_servers": details["total_windows_servers"],
                })
            basic_ready_state.set()

    def handleDetailsData(self, data):
        self.batch_temp_list.append({(self.batch_counter+1): data}) # Nice formatting

        self.batch_counter += 1
        if self.batch_counter == 3: # Reset at 3
            currentID = ids[len(details_list)]
            self.batch_temp_list.insert(0 ,{0: {'mesh_name': meshcaller_utilities.getInfoFromID(currentID, "company_name"),
                                                'mesh_desc': meshcaller_utilities.getInfoFromID(currentID, "company_desc"),
                                                'node_name': meshcaller_utilities.getInfoFromID(currentID, "name"),
                                                'node_tags': meshcaller_utilities.getInfoFromID(currentID, "node_tags"),
                                                'node_users': meshcaller_utilities.getInfoFromID(currentID, "node_users"),
                                                'node_os': meshcaller_utilities.getInfoFromID(currentID, "node_os"),
                                                'remote_address': meshcaller_utilities.getInfoFromID(currentID, "remote_address")}})

            details_list.append(self.batch_temp_list)

            self.batch_temp_list = []
            self.batch_counter = 0

            if len(ids) == len(details_list):
                details_ready_state.set()

    async def receive_processor(self, python_client):
        while True:
            message = await python_client.received_response_queue.get() # Get the next message from the queue
            action_type = message['action']

            match action_type:
                case 'meshes' | 'nodes': # Getting the companies/groups
                    self.handleBasicData(message[action_type])

                case 'getnetworkinfo' | 'lastconnect' | 'getsysinfo':
                    if action_type == 'getnetworkinfo':
                        self.handleDetailsData(message['netif2'])
                    elif action_type == 'lastconnect':
                        self.handleDetailsData({"lastconnecttime": message["time"], "lastconnectaddress": message["addr"]})
                    elif action_type == 'getsysinfo':
                        self.handleDetailsData(message["hardware"])

                case 'msg': # Can be a wildcard
                    print(json.dumps(message,indent=4))

                case _: # All undefined traffic will be ignored
                    if not args.silent:
                        print("Ignored:", action_type)

class meshcaller_playbook:
    @staticmethod
    def runPlaybook(playbook):
        print("WIP")
        print(json.dumps(playbook,indent=4))

class meshcaller_actions:
    @staticmethod
    async def processArguments(python_client, playbook):
        await basic_ready_state.wait()

        ids = meshcaller_utilities.getTargetIDs(args.company, args.device)

        match args.action:
            case 'list':
                if args.company:
                    global_temp_list = [company for company in global_list if company['mesh_name'] == args.company]
                else:
                    global_temp_list = global_list

                print(json.dumps(global_temp_list, indent=4))
                raise ScriptEndTrigger("Completed task successfully.", len(global_temp_list))

            case 'summary':
                if args.company:
                    summary_list = [{key: value for key, value in mesh.items() if key != "nodes"} 
                                    for mesh in global_list if mesh['mesh_name'] == args.company]
                else:
                    summary_list = [{key: value for key, value in mesh.items() if key != "nodes"} 
                                    for mesh in global_list]

                print(json.dumps(summary_list, indent=4))
                raise ScriptEndTrigger("Completed task successfully.", len(summary_list))

            case 'details':
                await meshcaller_utilities.sendDetailsRequest(python_client, ids)
                await details_ready_state.wait()

                print(json.dumps(details_list, indent=4))
                raise ScriptEndTrigger("Completed task successfully.", len(details_list))

            case 'report':
                await meshcaller_utilities.sendDetailsRequest(python_client, ids)
                await details_ready_state.wait()

                report_list = []
                for entry in details_list:
                    first_entry = entry[0][0]
                    temp_report_list = { # TRIED TO BE AS COMPATIBLE AS POSSIBLE!
                        "company_name":         first_entry.get("mesh_name", "Unknown Company"),
                        "company_id":           first_entry.get("mesh_desc", "Unknown Company ID"),
                        "node_name":            first_entry.get("node_name", "Unknown Node Name"),
                        "node_type":            first_entry.get("node_tags", "Unknown Type"),
                        "node_os":              first_entry.get("node_os", "Unknown Operating System"),
                        "node_users":           first_entry.get("node_users", "Unknown Users"),
                        "remote_address":       first_entry.get("remote_address", "Unknown Remote IP"),
                        "device_manufacturer":  (
                                                    entry[3][3]["linux"].get("sys_vendor", "Unknown Vendor")
                                                    if "linux" in entry[3][3] and "sys_vendor" in entry[3][3]["linux"]
                                                    else entry[3][3]["identifiers"].get("bios_vendor", entry[3][3]["identifiers"].get("board_vendor"))
                                                ),
                        "device_model":         (
                                                    entry[3][3]["identifiers"].get("board_name", "Unknown Model")
                                                    if entry[3][3]["identifiers"].get("board_vendor") == "Raspberry Pi"
                                                    else entry[3][3]["identifiers"].get("product_name", "Unknown Model (Most likely a Mac device.)")
                                                ),
                        "device_serial":        (
                                                    entry[3][3]["identifiers"].get("bios_serial", "Unknown Serial")
                                                    if ("linux" in entry[3][3] or "windows" in entry[3][3]) and entry[3][3]["identifiers"].get("board_vendor") != "Raspberry Pi"
                                                    else entry[3][3]["identifiers"].get("board_serial", "Unknown Serial")
                                                )
                    }
                    report_list.append(temp_report_list)

                print(json.dumps(report_list, indent=4))
                if not args.silent:
                    print("Trying to push to database now.")

                if args.dbpush == True:
                    datapusher = meshcaller_datapusher()
                    datapusher.pushData(report_list)

                raise ScriptEndTrigger("Completed task successfully.", len(report_list))

            case 'playbook':
                if playbook is None:
                    raise ScriptEndTrigger("No playbook found")
                else:
                    meshcaller_playbook.runPlaybook(playbook)
                    raise ScriptEndTrigger("Completed Task. Playbook tasks:", len(playbook['tasks']))

            case _:
                raise ScriptEndTrigger("No action command found, doing nothing.")

async def main():
    parser = argparse.ArgumentParser(description="Process command-line arguments")

    parser.add_argument("--conf", type=str, help="If wanted you can declare a PATH for the API configuration file, if not defined then it is ./api.conf.") # Critical (Soft of)
    parser.add_argument("-pb", "--playbook", type=str, help="Specify if you want to pass in a playbook file.")                                             # Optional

    parser.add_argument("-a", "--action", type=str.lower, help="Specify which action will be performed: list, summary, details, report, playbook.")                                                   # Critical
    parser.add_argument("-s", "--silent", action="store_true", help="Suppress terminal output.")                                                           # Optional

    parser.add_argument("-c", "--company", type=str, help="Optional argument to target a specific company/group.")                                         # Optional
    parser.add_argument("-d", "--device", type=str, help="Optional argument to target a specific device.")                                                 # Optional

    parser.add_argument("--dbpush", action="store_true", help="Select to push to database declared in conf.")

    global args    
    args = parser.parse_args()

    try:
        credentials = meshcaller_configreader.load_config(args.conf)   #CRITICAL ARG
        #playbook = meshcaller_configreader.load_playbook(args.playbook)  #OPTIONAL ARG

        python_client = meshcaller_websocket()
        processor = meshcaller_processor()

        websocket_daemon = asyncio.create_task(python_client.ws_handler(
            credentials['websocket_url'],
            credentials['username'],
            credentials['password']
        ))
        processor_daemon = asyncio.create_task(processor.receive_processor(python_client))

        await meshcaller_actions.processArguments(python_client, None)

        await asyncio.gather(websocket_daemon, processor_daemon)
    except ScriptEndTrigger as e:
            if not args.silent:
                print(e)  # Print the custom error message

# Entry point of the script
if __name__ == "__main__":
    asyncio.run(main())

SORRY. This is what I made for usage in the company.

DaanSelen commented 2 weeks ago

You can use the help command and do things such as

python3 meshcaller.py -a summary -c <group> (Don't forget the venv)

Which is what I need for my company Oh and I build in SQL pushing (I think only Postgres) with --dbpush.

I hope you can do something with this!

HuFlungDu commented 6 days ago

Thanks Daan,

I have implemented most of the API with tests on the development branch. I'm going to be testing in a project for a while before I merge to master, but if you want to play with it a little I'd appreciate the additional testing and any feedback.

DaanSelen commented 5 days ago

Thanks Daan,

I have implemented most of the API with tests on the development branch. I'm going to be testing in a project for a while before I merge to master, but if you want to play with it a little I'd appreciate the additional testing and any feedback.

Sure I'll test around!

DaanSelen commented 5 days ago

I will look at this with the eye of a novice python beginner, not an expert. So I'll need some time to figure things out.

Looks like it's a package, not a command line utility.

HuFlungDu commented 5 days ago

Yeah, it's a package. Basically to implement things like your script here more easily. For instance, instead of writing the code for sendDetailsRequest and the separate handling of details response, you would import the library and just run something like:

import meshctrl

async def main():
    ids = input().split() # Do this how you do it in your script, this is just a dumb way for illustration
    async with meshctrl.session.Session(**connect_info) as session:
        print(await s.get_details(session, ids))

async def get_details(session, ids):
    details = []
    for id in ids:
        details.append(await session.device_info(details=True)
    return details

You can also pass the handle around, though I only really tested it in the with statement, so you would probably just want to wrap your main function in the with like I did there.

I should have handled all the cases you would want to interact with the mesh server, but let me know if there's an API you use that I haven't implemented.

HuFlungDu commented 5 days ago

To run in your local environment, until I release into pypi, you can clone the repo, create a venv:

python -m venv .venv

Source the venv activate (I don't know which shell you use, and it's different per shell)

then run

python -m pip install -e .

Then you can import it from that environment for testing purposes.

DaanSelen commented 4 days ago

To run in your local environment, until I release into pypi, you can clone the repo, create a venv:

python -m venv .venv

Source the venv activate (I don't know which shell you use, and it's different per shell)

then run

python -m pip install -e .

Then you can import it from that environment for testing purposes.

Yes import it as a package in a new python environment (which explains the lib part). I'll try to make something but I'll take some time.