sindresorhus / execa

Process execution for humans
MIT License
6.86k stars 217 forks source link

How to prevent broken pipe (SIGPIPE) #1155

Closed xpuu closed 1 month ago

xpuu commented 1 month ago

First of all, thank you for your work. I've already spent quite a few hours reading the documentation, discussions, and some code. It's a remarkable research project that truly enhances programming experience in the Node.js ecosystem.

I'm writing various scripts to back up and sync databases, handling complex pipes, and processing gigabytes of data. For many reasons, I'm trying to avoid using the 'shell' option for execution. So far, this approach has worked well for me. However, I've been struggling with preventing the 'broken pipe' issue. I'll try to illustrate this with the following example:

# Broken pipe
openssl rand -hex 1 | tee >(wc -c > size) | head -c0; echo "${PIPESTATUS[@]}"; cat size
0 141 0   # pipestatus
0         # wrong wc output

# Broken pipe and complaining openssl
$ openssl rand -hex 10000 | tee >(wc -c > size) | head -c0; echo "${PIPESTATUS[@]}"; cat size
1 141 0   # pipestatus
0         # wrong wc output

# Broken pipe prevented
openssl rand -hex 1 | tee >(wc -c > size) | { head -c0; cat > /dev/null; }; echo "${PIPESTATUS[@]}"; cat size
0 0 0     # pipestatus
3         # correct wc output

By using { head -c0; cat > /dev/null; }, I ensure that the stdout of openssl is not closed prematurely, which prevents a broken pipe. However, when using Execa, I get unexpected results.

// This works fine
const cmd = execa('openssl', ['rand', '-hex', '10000'], { buffer: { stdout: false, stderr: true } })
console.log(await Promise.all([cmd.pipe`wc -c`, cmd.pipe`head -c0`]))

// Increasing openssl output results in exception
// ExecaError: Command failed with exit code 1: openssl rand -hex 100000
const cmd = execa('openssl', ['rand', '-hex', '100000'], { buffer: { stdout: false, stderr: true } })
console.log(await Promise.all([cmd.pipe`wc -c`, cmd.pipe`head -c0`]))

Is there any way to prevent this issue? Thanks.

ehmicky commented 1 month ago

Hi @xpuu,

To start, I wonder: what is the purpose of head -c0? Does not it ask to retrieve the first 0 bytes of stdin, i.e. always print nothing? Maybe it's just for example purpose?

When head is reaching the amount of bytes requesting by the -c flag, it stops the pipeline by sending a SIGPIPE to the previous command (here tee). The previous command can handle SIGPIPE, but in general it does not, and the OS default behavior then is to terminate that command. Even if it does not, the previous command might then write more output to stdout, leading to an EPIPE error since the stdin of head would then be closed.

You can experience this yourself using the following example.

$ openssl rand -hex 10000 | head -c1; echo -e "\n${PIPESTATUS[@]}";
1 0

By decreasing or increasing 10000, you will sometimes get 0 0, and sometimes 1 0. That's because OS signals take some time to be sent and received by the OS. This leads to a race condition: sometimes openssl terminates fast enough, before receiving the SIGPIPE, sometimes afterwards, depending on timing conditions and how big the input/output is.

Since SIGPIPE is used for the above purpose, Execa chooses to purposely not consider it an error.

https://github.com/sindresorhus/execa/blob/3fc804916d60b0b2e774a3642bd9815388caf7af/lib/resolve/wait-stream.js#L91-L96

But it cannot prevent that, once head has closed its stdin, openssl will keep trying to write to it, leading to an EPIPE error. That's actually how Unix pipelines were designed for that type of command, I believe. When this happens, openssl exits with a non-0 exit code.

Your problem is probably that, once openssl exits, it also stops writing to wc -c.

I think the core of the problem is that, the way Unix pipelines are designed, you're probably supposed to do: openssl -> head -> wc, as opposed to openssl -> wc + openssl -> head. In other words:

console.log(await execa`openssl rand -hex 10000`.pipe`head -c0`.pipe`wc -c`)

It seems like using a subshell { head -c0; cat > /dev/null; } as you mentioned is somehow working around this, but that's arguably a hack. Also, being a subshell, it requires using a shell, i.e. would need the shell option.

So it looks like the core of the problem is about how termination and input/output propagate across the pipeline.

The good news is that using the above example (with openssl -> head -> wc) might work for your use case thanks to Execa showing you the intermediate result of each command as part of result.pipedFrom.

const result = await execa`openssl rand -hex 10000`.pipe`head -c100`.pipe`wc -c`;
console.log(result.stdout) // 100
console.log(result.pipedFrom[0].stdout) // 100 random bytes
console.log(result.pipedFrom[0].pipedFrom[0].stdout) // 10000 random bytes
mkicks commented 1 month ago

Hi, thanks for the prompt reply. Here's the actual situation:

Database dump (and from its piped stdout)

The last two processes are likely to fail (e.g., no space left on the filesystem, database is down, etc.). Ideally, I wouldn’t want any of these errors to stop the other processes. I also don’t want to save the temp result, as it's more than 15GB.

ehmicky commented 1 month ago

If you were using head, I would suggest re-implementing head. head closes stdin after reading a specific amount of bytes. This leads to the first subprocess in the pipeline to write to head stdin, which leads to EPIPE after stdin has been closed. Instead, something like this.

// head.js
let totalSize = 0;
const requestedSize = Number(process.argv[2])

process.stdin.on('data', chunk => {
  // Unlike `head`, `stdin` is never closed by this process directly
  if (totalSize !== requestedSize) {
    const leftSize = requestedSize - totalSize
    const truncatedChunk = chunk.byteLength > leftSize ? chunk.subarray(0, leftSize) : chunk
    totalSize += truncatedChunk.byteLength
    process.stdout.write(truncatedChunk)
  }
})
// This now works
const cmd = execa('openssl', ['rand', '-hex', '100000'], { buffer: { stdout: false, stderr: true } })
console.log(await Promise.all([cmd.pipe`wc -c`, cmd.pipe`node head.js 0`]))

I also don’t want to save the temp result, as it's more than 15GB.

You can choose to use buffer.stdout: false for some parts of the pipeline but not other parts. Namely, the first command can use buffer.stdout: false to avoid buffering 15GB, but the later parts can avoid using this option. Not sure this would help you there, but mentioning it in case you missed this bit.

The last two processes are likely to fail (e.g., no space left on the filesystem, database is down, etc.). Ideally, I wouldn’t want any of these errors to stop the other processes.

Execa intentionally does not terminate other processes in the pipeline when one of them ends. It lets each process terminate on its own (including through EPIPE). This mimics how shells typically behave.

https://github.com/sindresorhus/execa/blob/3fc804916d60b0b2e774a3642bd9815388caf7af/lib/pipe/streaming.js#L6-L11

So the problem is not with Execa, but with one of the processes ending its stdin early, and the first process not handling writing to a process that has closed its stdin.

Database dump (and from its piped stdout) Counting the size of the uncompressed output for later reference Compressing it to the filesystem Immediately restoring it to another database server

I guess you're trying to save memory and time here, but for something as sensitive as a database dump, I would personally probably advice to avoid setups that might randomly fail. For example, doing things in several steps instead of using a single optimized pipeline.

That being said, I think you should be able to do the above steps in a regular pipeline without too much trouble, providing you write your own scripts for each part instead of relying on Unix utilities. For example, you could create the following Node.js scripts:

Then you can do:

const noBuffer = {buffer: {stdout: false, stderr: true}}
const result = await execa(noBuffer)`dump database`
  .pipe(noBuffer)`node count.js`
  .pipe(noBuffer)`node compress.js`
  .pipe`node restore.js`;

As a side note, I think the overall question is not really about a bug in Execa or a missing feature, so I'm not sure how much more I could delve into this topic.

ehmicky commented 1 month ago

@xpuu @mkicks Should we close this issue?

xpuu commented 1 month ago

Many thanks for your insights. I still have some questions on the topic, but I don’t have time this week to really delve into it. If I’m not able to write more next week, please go ahead and close the topic.

ehmicky commented 1 month ago

No problem @xpuu. Please feel free to open a separate issue for new questions. :+1: