yaacov / node-modbus-serial

A pure JavaScript implemetation of MODBUS-RTU (and TCP) for NodeJS
ISC License
627 stars 239 forks source link

Combination of modbus requests in the same TCP Frame. #540

Open rvbatista opened 8 months ago

rvbatista commented 8 months ago

Modbus TCP specification determines that only one modbus request needs to be inside a TCP Frame or packet. Looks like that node-modbus-serial don't use any kind of mechanism to ensure that, correct? We use node-modbus-serial in FUXA Scada, and one of the users have reported this issue. https://github.com/frangoteam/FUXA/issues/1030 Off course we can try to handle it on our usage of the library, but looks like that more people can suffer with that issue on their projects.

yaacov commented 8 months ago

HI, thank you for the issue,

Can you make a pull request fixing the issue ?

teddy1565 commented 5 months ago
/**
     * Write a Modbus "Force Multiple Coils" (FC=15) to serial port.
     *
     * @param {number} address the slave unit address.
     * @param {number} dataAddress the Data Address of the first coil.
     * @param {Array} array the array of boolean states to write to coils.
     * @param {Function} next the function to call next.
     */
    writeFC15(address, dataAddress, array, next) {
        // check port is actually open before attempting write
        if (this.isOpen !== true) {
            if (next) next(new PortNotOpenError());
            return;
        }

        // sanity check
        if (typeof address === "undefined" || typeof dataAddress === "undefined") {
            if (next) next(new BadAddressError());
            return;
        }

        const code = 15;
        let i = 0;

        // set state variables
        this._transactions[this._port._transactionIdWrite] = {
            nextAddress: address,
            nextCode: code,
            nextLength: 8,
            next: next
        };

        const dataBytes = Math.ceil(array.length / 8);
        const codeLength = 7 + dataBytes;
        const buf = Buffer.alloc(codeLength + 2); // add 2 crc bytes

        buf.writeUInt8(address, 0);
        buf.writeUInt8(code, 1);
        buf.writeUInt16BE(dataAddress, 2);
        buf.writeUInt16BE(array.length, 4);
        buf.writeUInt8(dataBytes, 6);

        // clear the data bytes before writing bits data
        for (i = 0; i < dataBytes; i++) {
            buf.writeUInt8(0, 7 + i);
        }

        for (i = 0; i < array.length; i++) {
            // buffer bits are already all zero (0)
            // only set the ones set to one (1)
            if (array[i]) {
                buf.writeBit(1, i, 7);
            }
        }

        // add crc bytes to buffer
        buf.writeUInt16LE(crc16(buf.slice(0, -2)), codeLength);

        // write buffer to serial port
        _writeBufferToPort.call(this, buf, this._port._transactionIdWrite);
    }

    /**
     * Write a Modbus "Preset Multiple Registers" (FC=16) to serial port.
     *
     * @param {number} address the slave unit address.
     * @param {number} dataAddress the Data Address of the first register.
     * @param {Array} array the array of values to write to registers.
     * @param {Function} next the function to call next.
     */
    writeFC16(address, dataAddress, array, next) {
        // check port is actually open before attempting write
        if (this.isOpen !== true) {
            if (next) next(new PortNotOpenError());
            return;
        }

        // sanity check
        if (typeof address === "undefined" || typeof dataAddress === "undefined") {
            if (next) next(new BadAddressError());
            return;
        }

        const code = 16;

        // set state variables
        this._transactions[this._port._transactionIdWrite] = {
            nextAddress: address,
            nextCode: code,
            nextLength: 8,
            next: next
        };

        let dataLength = array.length;
        if (Buffer.isBuffer(array)) {
            // if array is a buffer it has double length
            dataLength = array.length / 2;
        }

        const codeLength = 7 + 2 * dataLength;
        const buf = Buffer.alloc(codeLength + 2); // add 2 crc bytes

        buf.writeUInt8(address, 0);
        buf.writeUInt8(code, 1);
        buf.writeUInt16BE(dataAddress, 2);
        buf.writeUInt16BE(dataLength, 4);
        buf.writeUInt8(dataLength * 2, 6);

        // copy content of array to buf
        if (Buffer.isBuffer(array)) {
            array.copy(buf, 7);
        } else {
            for (let i = 0; i < dataLength; i++) {
                buf.writeUInt16BE(array[i], 7 + 2 * i);
            }
        }

        // add crc bytes to buffer
        buf.writeUInt16LE(crc16(buf.slice(0, -2)), codeLength);

        // write buffer to serial port
        _writeBufferToPort.call(this, buf, this._port._transactionIdWrite);
    }

maybe we just add a mechanism for split dataArray or recount codeLength or recursion to slove this issue?

but we still can't control OS TCP packet transmission behavior

Maybe as others have said, the OS may accumulate packets more than twice for transmission optimization.

teddy1565 commented 5 months ago

Sorry, maybe i'm wrong. I may have misunderstood the issue content I am reviewing relevant orthodox documents for investigation. If I have any conclusions, I will provide them again.

teddy1565 commented 5 months ago

image image image image image

teddy1565 commented 5 months ago

image image image image

teddy1565 commented 5 months ago

Provides selected documents for quick and convenient reference

teddy1565 commented 5 months ago

image

teddy1565 commented 5 months ago

https://github.com/frangoteam/FUXA/issues/1030#issue-2055091338

in this issue, first picture

00 25 00 00 00 0B 01 10 00 04 00 02 04 00 00 00 00 00 26 00 00 00 06 01 03 00 04 00 02

00 25 00 00 00 0B 01 is MBAP header 10 (FC 16) 00 04 (starting Address) 00 02 (Quantity of Registers) 04 (Byte Count 2*N) 00 00 00 00 (Register Value)

write(data) {
        if(data.length < MIN_DATA_LENGTH) {
            modbusSerialDebug("expected length of data is to small - minimum is " + MIN_DATA_LENGTH);
            return;
        }

        // remember current unit and command
        this._id = data[0];
        this._cmd = data[1];

        // remove crc and add mbap
        const buffer = Buffer.alloc(data.length + MIN_MBAP_LENGTH - CRC_LENGTH);
        buffer.writeUInt16BE(this._transactionIdWrite, 0);
        buffer.writeUInt16BE(0, 2);
        buffer.writeUInt16BE(data.length - CRC_LENGTH, 4);
        data.copy(buffer, MIN_MBAP_LENGTH);

        modbusSerialDebug({
            action: "send tcp port",
            data: data,
            buffer: buffer,
            unitid: this._id,
            functionCode: this._cmd,
            transactionsId: this._transactionIdWrite
        });

        // send buffer to slave
        this._client.write(buffer);

        // set next transaction id
        this._transactionIdWrite = (this._transactionIdWrite + 1) % MAX_TRANSACTIONS;
    }

I can sure, logically the algorithm is correct after calculation But the ADU in a same TCP FRAME maybe can't control

This involves many factors For example, operating system or node runtime, V8, napi

but we still can try add event listener on net.Socket listen event drain and i will check net.Socket in node runtime

rvbatista commented 5 months ago

@teddy1565 the drain is exactly what I found here. I think that probably will create some kind of delay in the communications, but only for this type of application that want to be extremely fast.

teddy1565 commented 5 months ago

According to the information I collected I think it can ensure that the node runtime uses sync mode when writing buffers to the kernel through native code. and get the drain event

But I think we still may not have control over whether the operating system will operate the network interface card the way we expect it to

In Node runtime Write Buffer Implement

// Wrapper for write(2).
//
// bytesWritten = write(fd, buffer, offset, length, position, callback)
// 0 fd        integer. file descriptor
// 1 buffer    the data to write
// 2 offset    where in the buffer to start from
// 3 length    how much to write
// 4 position  if integer, position to write at in the file.
//             if null, write from the current position
static void WriteBuffer(const FunctionCallbackInfo<Value>& args) {
  Environment* env = Environment::GetCurrent(args);

  const int argc = args.Length();
  CHECK_GE(argc, 4);

  CHECK(args[0]->IsInt32());
  const int fd = args[0].As<Int32>()->Value();

  CHECK(Buffer::HasInstance(args[1]));
  Local<Object> buffer_obj = args[1].As<Object>();
  char* buffer_data = Buffer::Data(buffer_obj);
  size_t buffer_length = Buffer::Length(buffer_obj);

  CHECK(IsSafeJsInt(args[2]));
  const int64_t off_64 = args[2].As<Integer>()->Value();
  CHECK_GE(off_64, 0);
  CHECK_LE(static_cast<uint64_t>(off_64), buffer_length);
  const size_t off = static_cast<size_t>(off_64);

  CHECK(args[3]->IsInt32());
  const size_t len = static_cast<size_t>(args[3].As<Int32>()->Value());
  CHECK(Buffer::IsWithinBounds(off, len, buffer_length));
  CHECK_LE(len, buffer_length);
  CHECK_GE(off + len, off);

  const int64_t pos = GetOffset(args[4]);

  char* buf = buffer_data + off;
  uv_buf_t uvbuf = uv_buf_init(buf, len);

  FSReqBase* req_wrap_async = GetReqWrap(args, 5);
  if (req_wrap_async != nullptr) {  // write(fd, buffer, off, len, pos, req)
    FS_ASYNC_TRACE_BEGIN0(UV_FS_WRITE, req_wrap_async)
    AsyncCall(env, req_wrap_async, args, "write", UTF8, AfterInteger,
              uv_fs_write, fd, &uvbuf, 1, pos);
  } else {  // write(fd, buffer, off, len, pos, undefined, ctx)
    CHECK_EQ(argc, 7);
    FSReqWrapSync req_wrap_sync;
    FS_SYNC_TRACE_BEGIN(write);
    int bytesWritten = SyncCall(env, args[6], &req_wrap_sync, "write",
                                uv_fs_write, fd, &uvbuf, 1, pos);
    FS_SYNC_TRACE_END(write, "bytesWritten", bytesWritten);
    args.GetReturnValue().Set(bytesWritten);
  }
}

net.Socket.write extends streams.writable fragment


function onwrite(stream, er) {
  const state = stream._writableState;

  if ((state[kState] & kExpectWriteCb) === 0) {
    errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
    return;
  }

  const sync = (state[kState] & kSync) !== 0;
  const cb = (state[kState] & kWriteCb) !== 0 ? state[kWriteCbValue] : nop;

  state[kState] &= ~(kWriting | kExpectWriteCb | kWriteCb);
  state.length -= state.writelen;
  state.writelen = 0;

  if (er) {
    // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
    er.stack; // eslint-disable-line no-unused-expressions

    if ((state[kState] & kErrored) === 0) {
      state[kErroredValue] = er;
      state[kState] |= kErrored;
    }

    // In case of duplex streams we need to notify the readable side of the
    // error.
    if (stream._readableState && !stream._readableState.errored) {
      stream._readableState.errored = er;
    }

    if (sync) {
      process.nextTick(onwriteError, stream, state, er, cb);
    } else {
      onwriteError(stream, state, er, cb);
    }
  } else {
    if ((state[kState] & kBuffered) !== 0) {
      clearBuffer(stream, state);
    }

    if (sync) {
      const needDrain = (state[kState] & kNeedDrain) !== 0 && state.length === 0;
      const needTick = needDrain || (state[kState] & kDestroyed !== 0) || cb !== nop;

      // It is a common case that the callback passed to .write() is always
      // the same. In that case, we do not schedule a new nextTick(), but
      // rather just increase a counter, to improve performance and avoid
      // memory allocations.
      if (cb === nop) {
        if ((state[kState] & kAfterWritePending) === 0 && needTick) {
          process.nextTick(afterWrite, stream, state, 1, cb);
          state[kState] |= kAfterWritePending;
        } else {
          state.pendingcb--;
          if ((state[kState] & kEnding) !== 0) {
            finishMaybe(stream, state, true);
          }
        }
      } else if ((state[kState] & kAfterWriteTickInfo) !== 0 &&
                 state[kAfterWriteTickInfoValue].cb === cb) {
        state[kAfterWriteTickInfoValue].count++;
      } else if (needTick) {
        state[kAfterWriteTickInfoValue] = { count: 1, cb, stream, state };
        process.nextTick(afterWriteTick, state[kAfterWriteTickInfoValue]);
        state[kState] |= (kAfterWritePending | kAfterWriteTickInfo);
      } else {
        state.pendingcb--;
        if ((state[kState] & kEnding) !== 0) {
          finishMaybe(stream, state, true);
        }
      }
    } else {
      afterWrite(stream, state, 1, cb);
    }
  }
}

function afterWriteTick({ stream, state, count, cb }) {
  state[kState] &= ~kAfterWriteTickInfo;
  state[kAfterWriteTickInfoValue] = null;
  return afterWrite(stream, state, count, cb);
}

function afterWrite(stream, state, count, cb) {
  state[kState] &= ~kAfterWritePending;

  const needDrain = (state[kState] & (kEnding | kNeedDrain | kDestroyed)) === kNeedDrain && state.length === 0;
  if (needDrain) {
    state[kState] &= ~kNeedDrain;
    stream.emit('drain');
  }

  while (count-- > 0) {
    state.pendingcb--;
    cb(null);
  }

  if ((state[kState] & kDestroyed) !== 0) {
    errorBuffer(state);
  }

  if ((state[kState] & kEnding) !== 0) {
    finishMaybe(stream, state, true);
  }
}
Writable.prototype.write = function(chunk, encoding, cb) {
  if (encoding != null && typeof encoding === 'function') {
    cb = encoding;
    encoding = null;
  }

  return _write(this, chunk, encoding, cb) === true;
};
teddy1565 commented 5 months ago

@teddy1565 the drain is exactly what I found here. I think that probably will create some kind of delay in the communications, but only for this type of application that want to be extremely fast.

@rvbatista yes, so i think, in default usage net.Socket write buffer, in the c++ native writeBuffer implemnt using AsyncCall this may be a relatively high possibility. maybe we can change TcpPort call net.Socket pass appropriate constructor parameters Allow it to force the use of writeSync, just make req_wrap_async is nullptr

At the same time, I do not rule out the actual management operation of tcp by the OS as mentioned above.

teddy1565 commented 5 months ago

I got another tidbits maybe it's just a TCP packet fragmentation problem?

teddy1565 commented 5 months ago

@rvbatista HI! Can I get the current status of the latest news?