OpenHausIO / backend

HTTP API for the OpenHaus SmartHome/IoT solution
https://docs.open-haus.io
6 stars 2 forks source link

create a https agent for interfaces/requests #340

Open mStirner opened 12 months ago

mStirner commented 12 months ago
    httpsAgent(options = {}) {

        let agent = new Agent({
            keepAlive: true,
            maxSockets: 1,
            ...options
        });

        //let settings = this.settings;

        agent.createConnection = ({ host = null, port = null }) => {

            console.log(`############## Create connection to tcp://${host}:${port}`);

            let readable = new PassThrough();
            let writable = new PassThrough();

            // TODO Implement "auto-drain" when no upstream is attached -> Move this "lower", e.g. before ws upstream?
            /*
            let writable = new Transform({
                transform(chunk, enc, cb) {

                    debugger;

                    //console.log("this.stream",);
                    console.error(">>>> Write data, flowing?", str.upstream ? true : false, settings.host);

                    if (str.upstream) {
                        this.push(chunk);
                    } else {
                        while (this.read() !== null) {
                            // do nothing with writen input data
                            // empty readable queue
                        }
                    }

                    cb();

                }
            });
            */

            let stream = new Duplex.from({
                readable,
                writable
            });

            stream.destroy = (...args) => {
                console.log("socket.destroy();", args);
            };

            stream.ref = (...args) => {
                console.log("socket.unref();", args);
            };

            stream.unref = (...args) => {
                console.log("socket.unref();", args);
            };

            stream.setKeepAlive = (...args) => {
                console.log("socket.setKeepAlive()", args);
            };

            stream.setTimeout = (...args) => {
                console.log("socket.setTimeout();", args);
            };

            stream.setNoDelay = (...args) => {
                console.log("socket.setNotDelay();", args);
            };

            this.stream.pipe(readable, { end: false });
            writable.pipe(this.stream, { end: false });

            return tls.connect({
                socket: stream,
                host: this.settings.host,
                ...options
            });

        };

        return agent;

    }
mStirner commented 11 months ago

Dunno why, but this attempt did not work:

Code ```js _createAgentStream() { // cleanup, could be possible be piped from previous "connections" this.stream.unpipe(); /* // check if passed host/port matches interface settings? if (host != settings.host || port != settings.port) { let msg = "host/port for interface missmatch, expected:\r\n"; msg += `\thost = ${host}; got = ${settings.host}\r\n`; msg += `\tport = ${settings.port}; got = ${settings.port}`; throw new Error(msg); } */ //let readable = new PassThrough(); //let writable = new PassThrough(); let readable = new Transform({ transform(chunk, enc, cb) { //console.log("[incoming]", chunk.toString()); // temp fix for #343 // this is not the prefered fix for this issue // it should be handled on "stream/socket" level instead // the issue above occoured with a "shelly 1pm" and parallel requests to /status /ota /settings // NOTE: what if the body contains json that has a `connection: close` property/key/value? chunk = chunk.toString().replace(/connection:\s?close\r\n/i, "connection: keep-alive\r\n"); this.push(chunk); cb(); } }); let writable = new Transform({ transform(chunk, enc, cb) { //console.log("[outgoing]", chunk.toString()); this.push(chunk); cb(); } }); // TODO Implement "auto-drain" when no upstream is attached -> Move this "lower", e.g. before ws upstream? /* let writable = new Transform({ transform(chunk, enc, cb) { debugger; //console.log("this.stream",); console.error(">>>> Write data, flowing?", str.upstream ? true : false, settings.host); if (str.upstream) { this.push(chunk); } else { while (this.read() !== null) { // do nothing with writen input data // empty readable queue } } cb(); } }); */ let stream = new Duplex.from({ readable, writable }); stream.destroy = (...args) => { console.log("socket.destroy();", args); }; stream.ref = (...args) => { console.log("socket.unref();", args); }; stream.unref = (...args) => { console.log("socket.unref();", args); }; stream.setKeepAlive = (...args) => { console.log("socket.setKeepAlive()", args); }; stream.setTimeout = (...args) => { console.log("socket.setTimeout();", args); }; stream.setNoDelay = (...args) => { console.log("socket.setNotDelay();", args); }; this.stream.pipe(readable, { end: false }); writable.pipe(this.stream, { end: false }); return stream; } // NEW VERSION, fix for #329 httpAgent(options = {}) { let agent = new http.Agent({ keepAlive: true, maxSockets: 1, ...options }); //let settings = this.settings; agent.createConnection = ({ host = null, port = null }) => { console.log(`############## Create connection to tcp://${host}:${port}`); return this._createAgentStream(); }; return agent; } httpsAgent(options = {}) { let agent = new https.Agent({ keepAlive: true, maxSockets: 1, ...options }); agent.createConnection = ({ host = null, port = null }) => { console.log(`############## Create connection to tcp://${host}:${port}`); let socket = this._createAgentStream(); return tls.connect({ socket, host: this.settings.host, ...options }); }; return agent; } ```
mStirner commented 9 months ago

After overriding the http header "connection" here: https://github.com/OpenHausIO/backend/blob/c193a9c4de11fad3fcc5698992ca477bf3e437c9/components/devices/class.interface.js#L246-L262 , this breaks the tls encryption:

Error: write EPROTO 140016587843520:error:1417110F:SSL routines:tls_process_server_hello:bad length:../deps/openssl/openssl/ssl/statem/statem_clnt.c:1485:

    at WriteWrap.onWriteComplete [as oncomplete] (node:internal/stream_base_commons:94:16)
    at Duplexify.ondata (node:internal/js_stream_socket:77:22)
    at Duplexify.emit (node:events:513:28)
    at addChunk (node:internal/streams/readable:315:12)
    at readableAddChunk (node:internal/streams/readable:289:9)
    at Duplexify.Readable.push (node:internal/streams/readable:228:10)
    at d._read (node:internal/streams/duplexify:350:16)
    at Transform.<anonymous> (node:internal/streams/duplexify:333:9)
    at Transform.emit (node:events:513:28)
    at emitReadable_ (node:internal/streams/readable:578:12) {
  errno: -71,
  code: 'EPROTO',
  syscall: 'write'
}

Could be resolved when no common "createStream" function is used, and in the tls one the manipulating stuff is ommitd. But then its possible that other stuff breaks and the interfaceStream is closed/destroyed.

mStirner commented 9 months ago

Works, but have concerns about the keep alive & socket destroying functionality.

    _agentConnection({ protocol = "http:" }) {

        // cleanup, could be possible be piped from previous "connections"
        this.stream.unpipe();

        let readable = new Transform({
            transform(chunk, enc, cb) {

                //console.log("[incoming]", chunk.toString());

                // temp fix for #343
                // this is not the prefered fix for this issue
                // it should be handled on "stream/socket" level instead
                // the issue above occoured with a "shelly 1pm" and parallel requests to /status /ota /settings
                // NOTE: what if the body contains json that has a `connection: close` property/key/value?
                if (protocol !== "https:") {
                    chunk = chunk.toString().replace(/connection:\s?close\r\n/i, "connection: keep-alive\r\n");
                }

                this.push(chunk);
                cb();

            }
        });

        let writable = new Transform({
            transform(chunk, enc, cb) {

                //console.log("[outgoing]", chunk.toString());

                this.push(chunk);
                cb();

            }
        });

        let stream = new Duplex.from({
            readable,
            writable
        });

        [
            "destroy", "ref", "unref",
            "setKeepAlive", "setTimeout", "setNoDelay"
        ].forEach((fnc) => {

            // fake methods above
            // dont need any of these
            stream[fnc] = () => { };

        });

        this.stream.pipe(readable, { end: false });
        writable.pipe(this.stream, { end: false });

        return stream;

    }

    // NEW VERSION, fix for #329
    httpAgent(options = {}) {

        if (this.cachedAgent) {
            return this.cachedAgent;
        }

        let agent = new http.Agent({
            keepAlive: true,
            maxSockets: 1,
            ...options
        });

        agent.createConnection = (...args) => {
            return this._agentConnection(...args);
        };

        this.cachedAgent = agent;
        return agent;

    }

    httpsAgent(options = {}) {

        if (this.cachedAgent) {
            return this.cachedAgent;
        }

        let agent = new https.Agent({
            keepAlive: true,
            maxSockets: 1,
            ...options
        });

        agent.createConnection = (...args) => {

            let { host, port } = this.settings;
            let socket = this._agentConnection(...args);

            return tls.connect({
                socket,
                host,
                port,
                ...options
            });

        };

        this.cachedAgent = agent;
        return agent;

    }

full code/backup (delete uncomited file in dev branch):

const Joi = require("joi");
const http = require("http");
const https = require("https");
const tls = require("tls");
const mongodb = require("mongodb");
const { Transform, Duplex } = require("stream");

/**
 * @description
 * Implements a interface item, that hides a duplex stream in it, to read/write data from a device interface
 * 
 * @class Interface
 * 
 * @param {Object} obj Object that matches the item schema. See properties below:
 * @param {InterfaceStream} stream Instance of a InterfaceStream object
 * 
 * @property {String} _id MongoDB Object id is as string
 * @property {String} type Type of the interface, `SERIAL` or `ETHERNET`
 * @property {Object} settings Interface specifiy type settings.
 * @property {Array} [adapter=["raw"]] Array of adapter to use for encoding/decoding data: `base64`, `eiscp`, `json`, `raw`
 * 
 * @see interfaceStream components/devices/class.interfaceStream.js
 * @link https://github.com/OpenHausIO/backend/blob/dev/components/devices/class.interface.js#L27
 * @link https://github.com/OpenHausIO/backend/blob/dev/components/devices/class.interface.js#L40
 */
module.exports = class Interface {

    constructor(obj, stream) {

        Object.assign(this, obj);
        this._id = String(obj._id);

        // hide stream object on interface
        Object.defineProperty(this, "stream", {
            value: stream
        });

        // share/set interface stream
        // see #86
        //let { interfaceStreams } = global.sharedObjects;
        let { interfaceStreams } = require("../../system/shared.js");
        interfaceStreams.set(this._id, stream);

        // hot fix for #350
        Object.defineProperty(this, "cachedAgent", {
            value: null,
            enumerable: false,
            configurable: false,
            writable: true
        });

    }

    /**
     * @function schema
     * Interface schema 
     * 
     * @static
     * 
     * @returns {Object} https://joi.dev/api/?v=17.6.0#anyvalidatevalue-options
     */
    static schema() {

        // settings from node.js serialport (https://serialport.io/docs/api-bindings-cpp#open)
        const SERIAL = Joi.object({
            device: Joi.string().required(),
            baudRate: Joi.number().default(9600),
            dataBits: Joi.number().allow(5, 6, 7, 8).default(8),
            stopBits: Joi.number().allow(1, 1.5, 2).default(1),
            parity: Joi.string().valid("even", "odd", "none").default("none"),
            rtscts: Joi.boolean().default(false),
            xon: Joi.boolean().default(false),
            xoff: Joi.boolean().default(false),
            xany: Joi.boolean().default(false),
            hupcl: Joi.boolean().default(true)
        }).required();

        const ETHERNET = Joi.object({
            //transport: Joi.string().valid("tcp", "udp", "raw").default("tcp"),
            socket: Joi.string().valid("tcp", "udp", "raw").default("tcp"),
            host: Joi.string().required(),
            port: Joi.number().min(1).max(65535).required(),
            // https://regex101.com/r/wF7Nfa/1
            // https://stackoverflow.com/a/50080404/5781499
            mac: Joi.string().default(null).allow(null).regex(/^([0-9a-fA-F]{2}[:]){5}[0-9a-fA-F]{2}$/)
        }).required();

        return Joi.object({
            _id: Joi.string().pattern(/^[0-9a-fA-F]{24}$/).default(() => {
                return String(new mongodb.ObjectId());
            }),
            type: Joi.string().default("ETHERNET"),
            settings: Joi.object().when("type", {
                is: "ETHERNET",
                then: ETHERNET
            }).when("type", {
                is: "SERIAL",
                then: SERIAL
            }),
            adapter: Joi.array().items("eiscp", "raw", "eol").default(["raw"]),
            description: Joi.string().allow(null).default(null)
        });

    }

    /**
     * @function httpAgent
     * Creates a custom http agent which use the underalying interfaceStream to forward data
     * 
     * @param {Object} [options] httpAgent options 
     * 
     * @returns {Object} httpAgent object
     * 
     * @link https://nodejs.org/dist/latest-v16.x/docs/api/http.html#new-agentoptions 
     */
    /*
    // *OLD* function, see #329
    httpAgent(options) {

        options = Object.assign({
            keepAlive: true,
            //maxSockets: 1,
            keepAliveMsecs: 3000,        // use this as websocket ping/pong value to detect broken connections?
        }, options);

        // stream nc tcp socket 
        // https://stackoverflow.com/a/33514724/5781499
        let agent = new Agent(options);

        // use interface stream as socket
        // createConnection returns duplex stream
        // https://nodejs.org/dist/latest-v14.x/docs/api/http.html#http_agent_createconnection_options_callback
        agent.createConnection = (options, cb) => {

            let input = new PassThrough();
            let output = new PassThrough();

            this.stream.pipe(input, { end: false });
            output.pipe(this.stream, { end: false });

            let socket = Duplex.from({
                readable: input,
                writable: output
            });

            // when multiple reuqests are done parallal, sometimes a AbortedErr is thrown
            // see #329 for details
            // TODO: Check if the upstream is drained, and perform requests in series
            // As "quick fix" till a solution is found for #312 catch the trown error
            socket.on("error", (err) => {
                console.log("Catched error on http.agent.createConnection", err);
                this.stream.destroy();
            });

            /*
                        [socket, this.stream, input, output].forEach((stream) => {
                            let cleanup = finished(stream, (err) => {

                                console.log("Socket duplex stream ended", err);

                                let chunk;

                                while (null !== (chunk = input.read())) {
                                    console.log(`>>>>>> Read ${chunk.length} bytes of data...`);
                                }

                                while (null !== (chunk = output.read())) {
                                    console.log(`>>>>>> Read ${chunk.length} bytes of data...`);
                                }

                                input.removeAllListeners();
                                output.removeAllListeners();

                                this.stream.unpipe(input);
                                output.unpipe(this.stream);

                                cleanup();

                            });
                        });
            *

            // TODO implement other socket functions?!
            //if (process.env.NODE_ENV !== "production") {
            socket.ref = (...args) => { console.log("socket.ref called", ...args); };
            socket.unref = (...args) => { console.log("socket.unref called", ...args); };
            socket.setKeepAlive = (...args) => { console.log("socket.setKeepAlive called", ...args); };
            socket.setTimeout = (...args) => { console.log("socket.setTimeout called", ...args); };
            socket.setNoDelay = (...args) => { console.log("socket.setNoDelay called", ...args); };
            // socket.remoteAddress=this.settings.host
            // socket.remotePort=this.settings.port
            //}

            //return socket;
            cb(null, socket);

        };

        return agent;

    }
    */

    _agentConnection() {

        // cleanup, could be possible be piped from previous "connections"
        this.stream.unpipe();

        let readable = new Transform({
            transform(chunk, enc, cb) {

                //console.log("[incoming]", chunk);

                // temp fix for #343
                // this is not the prefered fix for this issue
                // it should be handled on "stream/socket" level instead
                // the issue above occoured with a "shelly 1pm" and parallel requests to /status /ota /settings
                // NOTE: what if the body contains json that has a `connection: close` property/key/value?
                // NOTE: This alos breaks websocket connections!!!!
                /*
                if (protocol !== "https:") {
                    chunk = chunk.toString().replace(/connection:\s?close\r\n/i, "connection: keep-alive\r\n");
                }
                */

                this.push(chunk);
                cb();

            }
        });

        let writable = new Transform({
            transform(chunk, enc, cb) {

                //console.log("[outgoing]", chunk);

                this.push(chunk);
                cb();

            }
        });

        let stream = new Duplex.from({
            readable,
            writable
        });

        [
            "destroy", "ref", "unref",
            "setKeepAlive", "setTimeout", "setNoDelay"
        ].forEach((fnc) => {

            // fake methods above
            // dont need any of these
            stream[fnc] = () => { };

        });

        this.stream.pipe(readable, { end: false });
        writable.pipe(this.stream, { end: false });

        return stream;

    }

    // NEW VERSION, fix for #329
    httpAgent(options = {}) {

        if (this.cachedAgent) {
            return this.cachedAgent;
        }

        let agent = new http.Agent({
            keepAlive: true,
            maxSockets: 1,
            ...options
        });

        agent.createConnection = (...args) => {
            return this._agentConnection(...args);
        };

        this.cachedAgent = agent;
        return agent;

    }

    httpsAgent(options = {}) {

        if (this.cachedAgent) {
            return this.cachedAgent;
        }

        let agent = new https.Agent({
            keepAlive: true,
            maxSockets: 1,
            ...options
        });

        agent.createConnection = (...args) => {

            let { host, port } = this.settings;
            let socket = this._agentConnection(...args);

            return tls.connect({
                socket,
                host,
                port,
                ...options
            });

        };

        this.cachedAgent = agent;
        return agent;

    }

};
mStirner commented 4 months ago

[!IMPORTANT] Wait for refactoring in v4: https://github.com/OpenHausIO/backend/issues/460

mStirner commented 2 days ago

Test plugin:

const Store = require("../../components/store/class.store.js");
const request = require("../../helper/request.js");
const { Agent } = require("http");

module.exports = (info, logger, init) => {
    return init([
        "devices",
        "endpoints",
        "plugins",
        "rooms",
        "ssdp",
        "store",
        "users",
        "vault"
    ], async (scope, [
        C_DEVICES,
        C_ENDPOINTS,
        C_PLUGINS,
        C_ROOMS,
        C_SSDP,
        C_STORE,
        C_USERS,
        C_VAULT
    ]) => {

        /*
        let store = C_STORE.items[0];

        setTimeout(() => {

            console.clear();

            console.log("Store", store);

            //C_STORE.validate()

            let { value, error } = Store.validate(store);

            console.log(error, value);

        }, 3000);
        */

        C_DEVICES.found({
            labels: [
                "example=true",
                "host=example.com",
                "custom=abcd",
                "test=true"
            ]
        }, (device) => {
            setTimeout(() => {

                console.log("Devce found", device);

                let iface = device.interfaces[1];
                let { host, port } = iface.settings;
                let agent = iface.httpsAgent();

                request(`https://${host}:${port}`, {
                    agent
                }, (err, result) => {
                    console.log(err || result.body)
                });

            }, 3000);
        }, async (query) => {

            let device = await C_DEVICES.add({
                ...query,
                name: "example.com",
                interfaces: [{
                    type: "ETHERNET",
                    settings: {
                        host: "example.com",
                        port: 80
                    }
                }, {
                    type: "ETHERNET",
                    settings: {
                        host: "example.com",
                        port: 443
                    }
                }]
            });

            console.log("Device added", device)

        });

    });
};