Xahau / xahaud

Codebase for Xahaud - The consensus, RPC & blockchain app for the Xahau network.
https://xahau.network
ISC License
25 stars 12 forks source link

udp admin support [DO NOT MERGE] #390

Open RichardAH opened 1 week ago

RichardAH commented 1 week ago

Allow config to specify

[server]
port_rpc_admin_udp

[port_rpc_admin_udp]
port = 55555
ip = 0.0.0.0
protocol = udp
admin = 127.0.0.1

And subsequently service JSON queries over UDP.

Compiling and working with basic tests.

TODO: Test multi-datagram responses Ensure old requests are cleaned up Subscriptions

RichardAH commented 1 week ago

Subscriptions should be working now. They must be explicitly unsubscribed because there is no connection data to infer unsubscription from. In the event that a response exceeds a UDP datagram payload size then a fragmentation protocol is used as follows.

0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                          Zero Marker                          |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|         Packet Number          |        Total Packets         |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                                                               |
+                          Timestamp                            +
|                                                               |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                                                               |
+                          Payload                              +
|                            ...                                |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

Since all packets contain, ultimately JSON, if the first 4 bytes are null bytes (0,0,0,0) then this is invalid JSON and indicates a fragmented message. Wait until all fragments are received then reassemble the message. If the first 4 bytes are not null then the whole message is in the datagram and there is no header.

const dgram = require('dgram');
const EventEmitter = require('events');

class UDPFragmentedMessage {
    constructor(totalPackets, timestamp) {
        this.fragments = new Map();
        this.totalPackets = totalPackets;
        this.timestamp = timestamp;
        this.receivedAt = Date.now();
    }

    addFragment(packetNum, payload) {
        this.fragments.set(packetNum, payload);
    }

    isComplete() {
        return this.fragments.size === this.totalPackets;
    }

    reassemble() {
        const sortedPayloads = Array.from({ length: this.totalPackets }, (_, i) => 
            this.fragments.get(i)
        );

        if (sortedPayloads.some(p => p === undefined)) {
            throw new Error('Missing fragments during reassembly');
        }

        return Buffer.concat(sortedPayloads);
    }
}

class UDPReceiver extends EventEmitter {
    constructor(options = {}) {
        super();

        this.options = {
            port: options.port || 41234,
            address: options.address || '0.0.0.0',
            fragmentTimeout: options.fragmentTimeout || 30000, // 30 seconds
            cleanupInterval: options.cleanupInterval || 5000,  // 5 seconds
            maxFragments: options.maxFragments || 100000      // Maximum stored fragments
        };

        // Message storage
        this.messages = new Map(); // timestamp -> UDPFragmentedMessage
        this.totalStoredFragments = 0;

        // Initialize socket
        this.socket = dgram.createSocket('udp4');
        this.setupSocket();
        this.setupCleanup();
    }

    setupSocket() {
        this.socket.on('error', (err) => {
            this.emit('error', err);
        });

        this.socket.on('message', (msg, rinfo) => {
            try {
                this.handlePacket(msg, rinfo);
            } catch (err) {
                this.emit('error', err);
            }
        });

        this.socket.on('listening', () => {
            const address = this.socket.address();
            this.emit('listening', address);
        });
    }

    setupCleanup() {
        setInterval(() => {
            this.cleanup();
        }, this.options.cleanupInterval);
    }

    start() {
        return new Promise((resolve, reject) => {
            this.socket.once('error', reject);
            this.socket.bind(this.options.port, this.options.address, () => {
                this.socket.removeListener('error', reject);
                resolve();
            });
        });
    }

    stop() {
        return new Promise((resolve) => {
            this.socket.close(() => {
                this.messages.clear();
                this.totalStoredFragments = 0;
                resolve();
            });
        });
    }

    cleanup() {
        const now = Date.now();
        let cleanedCount = 0;

        for (const [timestamp, message] of this.messages) {
            if (now - message.receivedAt > this.options.fragmentTimeout) {
                this.totalStoredFragments -= message.fragments.size;
                this.messages.delete(timestamp);
                cleanedCount++;
            }
        }

        if (cleanedCount > 0) {
            this.emit('cleanup', {
                cleaned: cleanedCount,
                remaining: this.messages.size,
                storedFragments: this.totalStoredFragments
            });
        }
    }

    handlePacket(msg, rinfo) {
        // Check minimum size for header
        if (msg.length < 16) {
            this.emit('invalid_packet', { reason: 'Packet too small', rinfo });
            return;
        }

        // Check if it's a fragmented message
        const isFragmented = msg.readUInt32LE(0) === 0;

        if (!isFragmented) {
            this.emit('message', {
                data: msg,
                rinfo,
                fragmented: false
            });
            return;
        }

        // Parse header
        const packetNum = msg.readUInt16LE(4);
        const totalPackets = msg.readUInt16LE(6);
        const timestamp = msg.readBigUInt64LE(8);
        const payload = msg.subarray(16);

        // Validate header
        if (totalPackets === 0 || packetNum >= totalPackets) {
            this.emit('invalid_packet', { 
                reason: 'Invalid packet numbers', 
                packetNum, 
                totalPackets, 
                rinfo 
            });
            return;
        }

        // Check storage limits
        if (this.totalStoredFragments >= this.options.maxFragments && 
            !this.messages.has(timestamp)) {
            this.emit('error', new Error('Fragment storage limit reached'));
            return;
        }

        // Get or create message container
        let message = this.messages.get(timestamp);
        if (!message) {
            message = new UDPFragmentedMessage(totalPackets, timestamp);
            this.messages.set(timestamp, message);
        }

        // Store fragment
        if (!message.fragments.has(packetNum)) {
            message.addFragment(packetNum, payload);
            this.totalStoredFragments++;
        }

        // Check if message is complete
        if (message.isComplete()) {
            try {
                const completeData = message.reassemble();
                this.messages.delete(timestamp);
                this.totalStoredFragments -= totalPackets;

                this.emit('message', {
                    data: completeData,
                    timestamp: timestamp,
                    rinfo,
                    fragmented: true,
                    stats: {
                        fragments: totalPackets,
                        size: completeData.length
                    }
                });
            } catch (err) {
                this.emit('error', err);
            }
        }
    }

    getStats() {
        return {
            storedMessages: this.messages.size,
            storedFragments: this.totalStoredFragments,
            socketBound: this.socket?.address() != null
        };
    }
}

module.exports = UDPReceiver;
RichardAH commented 1 week ago
socat -t50 STDIO UDP4:127.0.0.1:55555

and then send rpc commands as per usual