nextflow-io / nextflow

A DSL for data-driven computational pipelines
Apache License 2.0
2.69k stars 621 forks source link

Foreign file staging can allow pipelines to proceed with corrupt files #5214

Closed scwatts closed 2 weeks ago

scwatts commented 1 month ago

Bug report

Expected behavior and actual behavior

Description and background

Desired behaviour

Steps to reproduce the problem

Given the difficultly involved in reproducing this intermittent problem I've put together toy example that can replicate behaviour locally with small data. The simple example is not a perfect replication but fundamentally demonstrates the problem described above.

File: (click to show) ```python #!/usr/bin/env python3 import argparse import http.server import time TAR_DATA = b'\x1f\x8b\x08\x00\xb7\xa3\xb5f\x00\x03\xed\xd1M\n\xc20\x10\x05\xe0\xac=ENP\'\xe9\xfc\x9c\xa7BB\x17\xc5H\x1b\xc1\xe3\x1bm)"\xb4\xbb(\xe2|\x9b\xb7\x19\x92I^L\xe9h*\x03\x00!\xb2\xcf\xe49\xc1\xe3\x9c\x0b\xeb\x90\x89Z\xef\x98\xc5\x82\xf3(\xceX\xaa\xbd\xd8\xc3u\xca\xddXV\x99r\xb8\xf4\xe1\xbc9W\xc6b\xdc9gy\xc7\x9a?"\x96\xfeO\xdd\xd8\xe4[\xaevG\xf9\x0fF\xdc\xeb\x9f\xdf\xfaoE\xbc\xb1Pm\xa3\x17\x7f\xde\x7f\x1f\x86!\x1d\xbe\xbd\x85RJ\xa9O\xbb\x03\xe7+\x86\x11\x00\n\x00\x00' class CustomRequestHandler(http.server.SimpleHTTPRequestHandler): protocol_version = 'HTTP/1.1' simulate_failure = False def do_GET(self): # Send headers including expected content length self.send_response(200) self.send_header('Content-Length', len(TAR_DATA)) self.end_headers() # Write initial chunk then create artificial pause for NF to display staging message self.wfile.write(TAR_DATA[:16]) time.sleep(5) # Write remaining data except final several bytes so that we can interrupt if requested self.wfile.write(TAR_DATA[16:-16]) # Simulate server-side interruption that should raise an exception in client if self.simulate_failure: raise Exception # Complete writing and flush output self.wfile.write(TAR_DATA[-16:]) self.wfile.flush() def get_arguments(): parser = argparse.ArgumentParser() parser.add_argument('--simulate_serverside_failure', action='store_true') return parser.parse_args() def main(): # Get commandline args and configure args = get_arguments() CustomRequestHandler.simulate_failure = args.simulate_serverside_failure # Launch HTTP server with http.server.ThreadingHTTPServer(('localhost', 8000), CustomRequestHandler) as server: server.serve_forever() if __name__ == '__main__': main() ```
File: (click to show) ```nextflow #!/usr/bin/env nextflow process DECOMPRESS_TARBALL { publishDir 'output/' input: path tarball output: path 'foo/' script: """ tar -zxvf ${tarball} """ } workflow { DECOMPRESS_TARBALL('http://localhost:8000/foo.tar.gz') } ```

After writing above scripts to disk, problem behaviour can be replicated with the following:

# First shell
python3 --simulate_serverside_failure

# Second shell
nextflow run

Remove --simulate_serverside_failure to show normal operation. Tested with Nextflow 24.04.4 and 24.07.0-edge.5923 on macOS.

Program output

Console output when simulating the above serverside error (i.e. staging fails but pipeline proceeds with corrupt input)

 N E X T F L O W   ~  version 24.04.4

Launching `` [happy_bhaskara] DSL2 - revision: fe3bec1c93

executor >  local (1)
[4a/5386b5] process > DECOMPRESS_TARBALL [100%] 1 of 1, failed: 1 ✘
Staging foreign file: http://localhost:8000/foo.tar.gz
ERROR ~ Error executing process > 'DECOMPRESS_TARBALL'

Caused by:
  Process `DECOMPRESS_TARBALL` terminated with an error exit status (1)

Command executed:

  tar -zxvf foo.tar.gz

Command exit status:

Command output:

Command error:
  tar: Error opening archive: truncated gzip input

Work dir:

Tip: you can replicate the issue by changing to the process work dir and entering the command `bash`

 -- Check '.nextflow.log' file for details

Attachments: nextflow.log


Additional context


ewels commented 3 weeks ago

Great minimal example, thanks for this!

jfy133 commented 3 weeks ago

I also don't have explicit examples of these either due to inconsistent behaviour, but I've seen similar things occasionally with nf-core/mag that has very large databases (77-200 GB tar.gz) and/or on distant servers (Australia). I assume it's something to do with the connection dropping/timing out at points, but occasionally the pipeline try to continue to despite the incomplete download.

ewels commented 3 weeks ago

@scwatts - thinking a bit more about this. Do you have any ideas about how we can detect that a download is incomplete?

The ones that spring to mind for me are the Content-length header (can't rely on this being present) or a checksum (typically not present). I'm not sure that HTTP connections really have many other safeguards in place for noticing an incomplete download?

scwatts commented 3 weeks ago

I think we are limited here to making use of the Content-Length or Transfer-Encoding header fields.

Just reading through RFC 9110 and I see that the spec stipulates a server should always send the Content-Length header field where size is known ahead of time. Though as you point out server implementations are not necessarily required to do so.

However, since we will often have Content-Length available I think it should be made use of when possible to improve reliability of foreign file staging via HTTP/S even if not all corner cases are covered.

If going this route it would also be feasible to take Content-Length for detection of interrupted downloads and extend to transfer restart / resume using range specifiers where the server shows support through Accept-Ranges.

pditommaso commented 2 weeks ago

The main problem here is that the data is copied via standard Java API that rely on InputStream data structure that by definition has no visibility to the overall length.

However, it should be possible to wrap the stream into a filter and fetch the content length as suggested above. To be investigated

scwatts commented 2 weeks ago

Closing now that this has been addressed in the above PR, thank you all!