pinojs / thread-stream

A streaming way to send data to a Node.js Worker Thread
MIT License
230 stars 23 forks source link

TransformStream does not close #36

Open Eomm opened 3 years ago

Eomm commented 3 years ago

When using this module within a TranformStream, it leads to this error:

/Users/mspigolon/workspace/thread-stream/index.js:278
        throw new Error('end() took too long (10s)')
        ^

Error: end() took too long (10s)
    at ThreadStream.end (/Users/mspigolon/workspace/thread-stream/index.js:278:15)
// worker.js
const { Transform } = require('stream');

async function run (opts) {
  const myTransform = new Transform({
    autoDestroy: true,
    transform(chunk, encoding, callback) {
      console.log(chunk.toString());
      callback(null)
    }
  });
  return myTransform
}

module.exports = run

// ===========================
// index.js (from README)
'use strict'

const ThreadStream = require('./index')
const { join } = require('path')

const stream = new ThreadStream({
  filename: join(__dirname, 'worker.js'),
  workerData: { dest: './qwe.sss'},
  workerOpts: {}, // Other options to be passed to Worker
  sync: false, // default
})

stream.write('hello')

// Asynchronous flushing
stream.flush(function () {
  stream.write(' ')
  stream.write('world')

  // Synchronous flushing
  stream.flushSync()
  stream.end()
})

Adding the statement:

myTransform.end = myTransform.destroy

solve the issue, but I'm not sure it is the right path to solve because it mix the readable stream interface withing the writable stream one. It seems the writer stream end correctly, but the reader is still alive.

The myTransform does not emit any events (close, finish etc..)

I tried Node.js v16 and 14 and I get the same results.

mcollina commented 3 years ago

The behavior is correct, as there is no one reading from that transform. If you call .end() on a Transform it won't close the Readable side of it to end unless there is somebody reading from it.

This would work:

// worker.js
const { Transform } = require('stream');

async function run (opts) {
  const myTransform = new Transform({
    autoDestroy: true,
    transform(chunk, encoding, callback) {
      console.log(chunk.toString());
      callback(null)
    }
  });
  myTransform.resume()
  return myTransform
}

module.exports = run

Note that passing down a Readable as a worker does not make sense, we might think about throwing in this case. This API should only support a Writable.

Eomm commented 3 years ago

'pino-abstract-transport' returns a Transport that is processed by the worker. So throwing an error should break some pino transport plugins

I need to check how it consumes the readable stream listening the data event

mcollina commented 3 years ago

I don't understand.

tobydeh commented 3 years ago

I have the same issue Error: end() took too long (10s)

I'm using pino with a custom transport and the error happens when I call process.exit()

The transport file:

module.exports = function transport (options = {}): pino.DestinationStream {
  const s: any = papertrail.createWriteStream(options);
  // Seems to fix the issue as mentioned above
  s.end = s.destroy;
  return s;
};

The papertrail.createWriteStream function uses pumpify...

module.exports.createWriteStream = (opts) => {
  const { appname, echo, host, port, connection, 'message-only': messageOnly } = { ...defaultOptions, ...opts }

  const parseJson = pinoPapertrail.parseJson()
  const toSyslog = pinoPapertrail.toSyslog({ appname, 'message-only': messageOnly })
  const papertrail = pinoPapertrail.toPapertrail({ echo, port, host, connection })

  return pumpify(parseJson, toSyslog, papertrail)
}

Any help would be much appreciated :)

robertsLando commented 2 years ago

Having same problem here, seems this happens only when using nodejs 12.x.x, adding the line myTransform.end = myTransform.destroy as @Eomm suggested fixed the issue for me

ghost commented 2 years ago

I've created reproducible example https://github.com/Zn250g/server/tree/tests#readme

docker or podman is required

The issue:

Screenshot from 2022-04-02 20-29-25

test.sh:

podman build -q -t ubuntu-test -f ./ubuntu.dockerfile . && \
podman run --rm --interactive --cpus=2 --memory=7g --publish 9229-9239:9229-9239 localhost/ubuntu-test:latest \
  node --experimental-vm-modules ./test.js

./test.js:

import { fileURLToPath } from 'url'
import { dirname, join } from 'path'
import spawn from 'cross-spawn'

const ROOT = dirname(fileURLToPath(import.meta.url))

let errorsCount = 0

let server = spawn('node', [
  // '--inspect-brk=0.0.0.0:9230',
  join(ROOT, 'destroy.js')
])

server.stdout?.on('data', chank => {
  let msg = chank.toString()
  if (msg.includes('Error')) {
    errorsCount += 1
    console.error(msg)
  } else {
    console.log(chank.toString())
  }
})

server.stderr?.on('data', chank => {
  errorsCount += 1
  console.error(chank.toString())
})

server.on('close', exit => {
  console.assert(
    errorsCount === 0,
    `Expect errorsCount to equal 0 but got ${errorsCount}`
  )
  console.assert(
    exit === 0 || exit === null,
    `Expect exit to equal 0 or null but got ${exit}`
  )
})

destroy.js:

import pino from 'pino'
import { dirname, join } from 'path'
import { fileURLToPath } from 'url'

const __dirname = dirname(fileURLToPath(import.meta.url))

pino({
  name: 'logux-server',
  transport: {
    pipeline: [
      {
        target: join(__dirname, './createPinoTransport.js'),
        options: {
          basepath: '/usr/src/app',
          color: false
        }
      },
      {
        target: 'pino/file'
      }
    ]
  }
})

process.on('uncaughtException', console.error)

ubuntu.dockerfile:

FROM ubuntu:20.04

RUN apt update
RUN apt-get install -y curl
RUN curl -fsSL https://deb.nodesource.com/setup_17.x | bash -
RUN apt-get install -y nodejs

WORKDIR /usr/src/app
RUN useradd tester
RUN chown tester /usr/src/app
RUN mkdir /home/tester
RUN chown tester /home/tester
USER tester

RUN npm config set update-notifier false
RUN mkdir /usr/src/app/.npm-global
ENV NPM_CONFIG_PREFIX=/usr/src/app/.npm-global

COPY --chown=tester package*.json ./
RUN npm ci

COPY --chown=tester . ./

ENV FORCE_COLOR=2

CMD ["node"]
mcollina commented 2 years ago

The github repo is not accessible and createPinoTransport.js is missing.

jsumners commented 2 years ago

i have the same issue too. It seem that pinojs has issue when running inside docker?

Please provide evidence of such a claim. Pino is used within Docker by many people.

kayazinc commented 2 years ago

i have the same issue too. It seem that pinojs has issue when running inside docker?

Please provide evidence of such a claim. Pino is used within Docker by many people.

sorry for jumping the gun, let me investigate further. thanks.