Open Efkovole opened 17 hours ago
I fixed the code. network.js and network_thread.js
↓↓network.js ↓↓
'use strict';
/*!
* s1panel - sensor/network
* Copyright (c) 2024 Tomasz Jaworski (Fixed Filip Šmída)
* GPL-3 Licensed
*/
const threads = require('worker_threads');
const logger = require('../logger');
function record_sample(array, value, max_points) {
if (!array.length) {
for (let i = 0; i < max_points; i++) {
array.push(0);
}
}
array.push(value);
array.shift();
return value;
}
function bytes_to_data_rate(bytes, bits) {
const _value = bits ? bytes * 8 : bytes;
const kb = _value / 1024;
const mb = kb / 1024;
const gb = mb / 1024;
if (gb >= 1) {
return gb.toFixed(bits ? 0 : 2) + (bits ? ' Gbit/s' : ' Gb/s');
} else if (mb >= 1) {
return mb.toFixed(bits ? 0 : 2) + (bits ? ' Mbit/s' : ' Mb/s');
} else if (kb >= 1) {
return kb.toFixed(bits ? 0 : 2) + (bits ? ' kbit/s' : ' Kb/s');
}
return bytes + (bits ? ' bit/s' : ' B/s');
}
function max_link_capacity_bytes(link_speed) {
return (link_speed * 0.125) * (1024 * 1024);
}
async function sample(rate, format, config) {
const _private = config._private;
const _diff = _private.last_sampled ? Math.floor(Number(process.hrtime.bigint()) / 1000000) - _private.last_sampled : 0;
let _dirty = false;
if (!_private.last_sampled || _diff > rate) {
_private.last_sampled = Math.floor(Number(process.hrtime.bigint()) / 1000000);
_private.worker.postMessage({ iface: _private.iface, rate: rate });
_dirty = _private.thread_checkin_count ? true : false;
_private.thread_checkin_count = 0;
}
if (_dirty) {
_private.max_rx_bytes = Math.max(..._private.history_rx_bytes);
_private.max_tx_bytes = Math.max(..._private.history_tx_bytes);
_private.max_rx_packets = Math.max(..._private.history_rx_packets);
_private.max_tx_packets = Math.max(..._private.history_tx_packets);
}
const _absolute_max = max_link_capacity_bytes(_private.link_speed);
let _max = _absolute_max;
const _output = format.replace(/{(\d+)}/g, (match, number) => {
switch (number) {
case '0':
return _private.iface;
case '1': // download
_max = _private.scale_factor ? Math.min(Math.ceil(_private.max_rx_bytes * _private.scale_factor), _absolute_max) : _absolute_max;
return _private.history_rx_bytes.length > 0 ? _private.history_rx_bytes[_private.history_rx_bytes.length - 1] : 0;
case '2':
return _private.history_rx_bytes.join();
case '3':
return _private.history_rx_bytes.length > 0 ? bytes_to_data_rate(_private.history_rx_bytes[_private.history_rx_bytes.length - 1]) : '0 B/s';
case '4': // upload
_max = _private.scale_factor ? Math.min(Math.ceil(_private.max_tx_bytes * _private.scale_factor), _absolute_max) : _absolute_max;
return _private.history_tx_bytes.length > 0 ? _private.history_tx_bytes[_private.history_tx_bytes.length - 1] : 0;
case '5':
return _private.history_tx_bytes.join();
case '6':
return _private.history_tx_bytes.length > 0 ? bytes_to_data_rate(_private.history_tx_bytes[_private.history_tx_bytes.length - 1]) : '0 B/s';
case '7': // download packets
_max = _private.max_rx_packets;
return _private.history_rx_packets.length > 0 ? _private.history_rx_packets[_private.history_rx_packets.length - 1] : 0;
case '8':
return _private.history_rx_packets.length > 0 ? _private.history_rx_packets[_private.history_rx_packets.length - 1] + ' rx/pps' : '0 rx/pps';
case '9':
return _private.history_rx_packets.join();
case '10': // upload packets
_max = _private.max_tx_packets;
return _private.history_tx_packets.length > 0 ? _private.history_tx_packets[_private.history_tx_packets.length - 1] : 0;
case '11':
return _private.history_tx_packets.length > 0 ? _private.history_tx_packets[_private.history_tx_packets.length - 1] + ' tx/pps' : '0 tx/pps';
case '12':
return _private.history_tx_packets.join();
case '13':
return _private.link_speed;
case '14':
return _private.link_mtu;
case '15': // bits
return _private.history_rx_bytes.length > 0 ? bytes_to_data_rate(_private.history_rx_bytes[_private.history_rx_bytes.length - 1], true) : '0 bit/s';
case '16':
return _private.history_tx_bytes.length > 0 ? bytes_to_data_rate(_private.history_tx_bytes[_private.history_tx_bytes.length - 1], true) : '0 bit/s';
case '17':
return _private.ipv4;
case '18':
return _private.ipv6;
default:
return 'null';
}
});
return { value: _output, min: 0, max: _max };
}
function init(config) {
const _private = {
max_points: config?.max_points || 300,
iface: config?.interface || 'enp2s0',
history_rx_bytes: [],
history_tx_bytes: [],
max_rx_bytes: 0,
max_tx_bytes: 0,
history_rx_packets: [],
history_tx_packets: [],
max_rx_packets: 0,
max_tx_packets: 0,
link_speed: 1000,
link_mtu: 1500,
scale_factor: config?.scaling || 1.5,
thread_checkin_count: 0
};
logger.info('initialize: monitoring interface ' + _private.iface);
logger.info('initialize: network max points are set to ' + _private.max_points);
_private.worker = new threads.Worker(__dirname + '/network_thread.js', { workerData: { iface: _private.iface } });
_private.worker.on('message', message => {
if (!message.rx || !message.tx) {
logger.error('Chybí data ve zprávě workeru:', message);
return;
}
_private.link_mtu = message.mtu;
_private.link_speed = message.speed;
record_sample(_private.history_rx_bytes, message.rx.bytes, _private.max_points);
record_sample(_private.history_tx_bytes, message.tx.bytes, _private.max_points);
record_sample(_private.history_rx_packets, message.rx.packets, _private.max_points);
record_sample(_private.history_tx_packets, message.tx.packets, _private.max_points);
_private.ipv4 = message.ipv4;
_private.ipv6 = message.ipv6;
_private.thread_checkin_count++;
});
_private.worker.on('error', (err) => {
logger.error('Worker error:', err);
_private.worker.terminate(); // Ukončit worker při chybě
_private.worker = new threads.Worker(__dirname + '/network_thread.js', { workerData: { iface: _private.iface } });
});
_private.worker.on('exit', (code) => {
if (code !== 0) {
logger.error(`Worker stopped with exit code ${code}`);
}
});
config._private = _private;
return 'network_' + _private.iface;
}
module.exports = {
init,
sample
};
↓↓ network_thread.js ↓↓
'use strict';
/*!
* s1panel - sensor/network_thread
* Copyright (c) 2024 Tomasz Jaworski
* GPL-3 Licensed
*/
const fs = require('fs');
const threads = require('worker_threads');
const logger = require('../logger');
const { exec } = require('child_process');
const DEFAULT_RATE_MS = 1000;
const TIMEOUT_COUNT = 30;
let _running = false;
let _collect_count = 0;
let _fault = false;
function read_file(path) {
return new Promise((fulfill, reject) => {
fs.readFile(path, 'utf8', (err, data) => {
if (err) {
logger.error(`network_thread: Error reading file ${path}: ${err.message}`);
return reject(new Error(`Error reading file ${path}: ${err.message}`));
}
fulfill(data);
});
});
}
function run_command(cmdline) {
return new Promise((fulfill, reject) => {
const _runit = exec(cmdline, (error, stdout, stderr) => {
if (error) {
logger.error(`network_thread: Command failed (${cmdline}): ${error.message}`);
return reject(new Error(`Command failed (${cmdline}): ${error.message}`));
}
fulfill(stdout);
});
});
}
function read_ip(iface) {
return new Promise((fulfill, reject) => {
const _cmdline = `ip -j a show dev ${iface}`;
run_command(_cmdline)
.then(output => {
try {
const parsed = JSON.parse(output);
fulfill(parsed);
} catch (parseError) {
logger.error(`network_thread: JSON parse error for IP data: ${parseError.message}`);
reject(new Error(`JSON parse error for IP data: ${parseError.message}`));
}
})
.catch(err => {
if (!_fault) {
logger.error(`network_thread: sensors reported error: ${err.message}`);
_fault = true;
}
fulfill(null); // Pokračovat i při chybě
});
});
}
function network_usage(iface) {
const _base_path = `/sys/class/net/${iface}`;
const _path = `${_base_path}/statistics`;
return Promise.all([
read_file(`${_base_path}/mtu`).catch(err => null),
read_file(`${_base_path}/speed`).catch(err => null),
read_file(`${_path}/rx_bytes`).catch(err => null),
read_file(`${_path}/tx_bytes`).catch(err => null),
read_file(`${_path}/rx_packets`).catch(err => null),
read_file(`${_path}/tx_packets`).catch(err => null),
read_ip(iface).catch(err => null)
]);
}
let _last_rx_bytes = 0;
let _last_tx_bytes = 0;
let _last_rx_packets = 0;
let _last_tx_packets = 0;
function collect(message) {
_collect_count++;
if (_collect_count < TIMEOUT_COUNT) {
network_usage(message.iface)
.then(results => {
if (!results || results.length < 7) {
logger.error('network_thread: Incomplete network usage data received.');
return;
}
const _mtu = Number(results[0]);
const _speed = Number(results[1]);
const _current_rx_bytes = Number(results[2]);
const _current_tx_bytes = Number(results[3]);
const _current_rx_packets = Number(results[4]);
const _current_tx_packets = Number(results[5]);
const _delta_rx_bytes = _last_rx_bytes ? _current_rx_bytes - _last_rx_bytes : 0;
const _delta_tx_bytes = _last_tx_bytes ? _current_tx_bytes - _last_tx_bytes : 0;
const _delta_rx_packets = _last_rx_packets ? _current_rx_packets - _last_rx_packets : 0;
const _delta_tx_packets = _last_tx_packets ? _current_tx_packets - _last_tx_packets : 0;
_last_rx_bytes = _current_rx_bytes;
_last_tx_bytes = _current_tx_bytes;
_last_rx_packets = _current_rx_packets;
_last_tx_packets = _current_tx_packets;
const _ip_data = results[6];
let _ipv4 = 'n/a';
let _ipv4_count = 0;
let _ipv6 = 'n/a';
let _ipv6_count = 0;
if (_ip_data) {
_ip_data.forEach(each => {
each.addr_info.forEach(info => {
if ('global' === info.scope) {
switch (info.family) {
case 'inet':
if (!_ipv4_count) {
_ipv4 = info.local;
_ipv4_count++;
}
break;
case 'inet6':
if (!_ipv6_count) {
_ipv6 = info.local;
_ipv6_count++;
}
break;
}
}
});
});
}
// Ověření, že všechna data jsou validní
if (isNaN(_mtu) || isNaN(_speed)) {
logger.error('network_thread: Invalid MTU or speed data.');
}
if (isNaN(_delta_rx_bytes) || isNaN(_delta_tx_bytes) || isNaN(_delta_rx_packets) || isNaN(_delta_tx_packets)) {
logger.error('network_thread: Invalid delta values.');
}
threads.parentPort.postMessage({
mtu: _mtu || 0,
speed: _speed || 0,
rx: { bytes: _delta_rx_bytes || 0, packets: _delta_rx_packets || 0 },
tx: { bytes: _delta_tx_bytes || 0, packets: _delta_tx_packets || 0 },
ipv4: _ipv4,
ipv6: _ipv6
});
setTimeout(() => {
collect(message);
}, message.rate || DEFAULT_RATE_MS);
})
.catch(err => {
logger.error(`network_thread: Error during data collection: ${err.message}`);
// Pokračovat i při chybě, aby se skript neskončil
setTimeout(() => {
collect(message);
}, message.rate || DEFAULT_RATE_MS);
});
} else {
logger.info('network_thread: collector stopped for iface ' + message.iface);
_running = false;
}
}
threads.parentPort.on('message', message => {
_collect_count = 0; // reset
if (!_running) {
_running = true;
logger.info('network_thread: collector started for iface ' + message.iface);
collect(message);
}
});
logger.info('network_thread: started... for ' + threads.workerData.iface);
do you know why you were getting invalid values?
I'd like to thank you in advance for your work. However, I have a problem with the network graph. It gets stuck after a few minutes.