I'm testing this library for use in a workflow that involves piping multiple processes together (something I wish this library supported internally, where it could probably be done more efficiently - without having to copy ByteBuffers - though presumably this is zero-copy with direct ones - in order to avoid holding references to NuProcess's output buffers), including complex output handling, and occasionally piping one process to more than one output process, or observing the output from within the Java process.
One issue with creating workflows like this is this: Say I'm piping process A's stdout to process B's stdin.
I launch B, then A. A caches buffers into a ConcurrentLinkedDeque, which, B will read from when onStdInReady() is called.
However, onStdinReady() is called before any data is available, and even if it returns true, which should result in it's being called again, if no data is written to the passed buffer, it is not called again. The only solution I've found is to block in onStdinReady() until the first buffer is available. Which kind of defeats the purpose of using a non-blocking library for this stuff. But more seriously - and I haven't dug into the code deeply enough to determine this - if the thread pool used is the one created in the static block at the top of LinuxProcess, then there are only numProcessors threads available - once there are as many of these blocked as there are cores, there will be no threads to collect output.
See the example code below (it uses wav2png and sox to read an audio file into 16-bit integer wav format and generate a thumbnail from it). If you remove the call to waitForFirstWrite.await(), no output will ever be written to wav2png.
public static void main(String[] args) throws FileNotFoundException, InterruptedException {
String file = "/tmp/test-32bitFloat.wav";
NuProcessBuilder soxBuilder = new NuProcessBuilder( "/usr/bin/sox", file, "-t", "wav", "-b", "16", "-" );
NuProcessBuilder wav2pngBuilder = new NuProcessBuilder( "/usr/bin/wav2png", "-", "-o", "/tmp/test.png" );
File w2err = new File( "/tmp/test-wav2png.err" );
File soxerr = new File( "/tmp/test-sox.err" );
FileChannel w2errCh = new FileOutputStream( w2err ).getChannel();
FileChannel soxErrCh = new FileOutputStream( soxerr ).getChannel();
ConcurrentLinkedDeque<ByteBuffer> writes = new ConcurrentLinkedDeque<>();
AtomicBoolean done = new AtomicBoolean();
CountDownLatch allDoneLatch = new CountDownLatch( 2 );
CountDownLatch waitForFirstWrite = new CountDownLatch( 1 );
wav2pngBuilder.setProcessListener( new NuProcessHandler() {
@Override
public void onPreStart(NuProcess nuProcess) {
}
@Override
public void onStart(NuProcess nuProcess) {
nuProcess.wantWrite();
}
@Override
public void onExit(int exitCode) {
try {
System.out.println( "wav2png exited. " + exitCode );
} finally {
latch.countDown();
}
}
@Override
public void onStdout(ByteBuffer buffer, boolean closed) {
}
@Override
public void onStderr(ByteBuffer buffer, boolean closed) {
try {
w2errCh.write( buffer );
if ( closed ) {
w2errCh.close();
}
} catch ( IOException ex ) {
ex.printStackTrace();
}
}
@Override
public boolean onStdinReady(ByteBuffer buffer) {
try {
waitForFirstWrite.await();
} catch ( InterruptedException ex ) {
ex.printStackTrace();
}
ByteBuffer out;
int cap = buffer.capacity() - buffer.position();
int bytesWritten = 0;
while ( ( out = writes.pollFirst() ) != null ) {
out.flip();
int rem = out.remaining();
if ( rem <= cap ) {
bytesWritten += rem;
cap -= rem;
buffer.put( out );
if ( cap == 0 ) {
break;
}
} else {
if ( buffer.remaining() == 0 ) {
out.flip();
writes.addFirst( out );
break;
} else {
byte[] bytes = new byte[buffer.remaining()];
bytesWritten += bytes.length;
out.get( bytes );
buffer.put( bytes );
out.compact();
writes.addFirst( out );
break;
}
}
}
if ( done.get() && writes.isEmpty() ) {
return false;
}
if ( bytesWritten > 0 ) {
buffer.flip();
}
return true;
}
} );
soxBuilder.setProcessListener( new NuProcessHandler() {
private NuProcess proc;
@Override
public void onPreStart(NuProcess nuProcess) {
this.proc = nuProcess;
}
@Override
public void onStart(NuProcess nuProcess) {
}
@Override
public void onExit(int exitCode) {
latch.countDown();
}
@Override
public void onStdout(ByteBuffer buffer, boolean closed) {
done.set( closed );
ByteBuffer copy = ByteBuffer.allocateDirect( buffer.remaining() );
copy.put( buffer );
writes.addLast( copy );
waitForFirstWrite.countDown();
}
@Override
public void onStderr(ByteBuffer buffer, boolean closed) {
try {
soxErrCh.write( buffer );
if ( closed ) {
soxErrCh.close();
}
} catch ( IOException ioe ) {
ioe.printStackTrace();
}
}
@Override
public boolean onStdinReady(ByteBuffer buffer) {
return false;
}
} );
NuProcess wav2png = wav2pngBuilder.start();
NuProcess p = soxBuilder.start();
allDoneLatch.await();
}
I'm testing this library for use in a workflow that involves piping multiple processes together (something I wish this library supported internally, where it could probably be done more efficiently - without having to copy ByteBuffers - though presumably this is zero-copy with direct ones - in order to avoid holding references to NuProcess's output buffers), including complex output handling, and occasionally piping one process to more than one output process, or observing the output from within the Java process.
One issue with creating workflows like this is this: Say I'm piping process A's stdout to process B's stdin.
I launch B, then A. A caches buffers into a ConcurrentLinkedDeque, which, B will read from when
onStdInReady()
is called.However,
onStdinReady()
is called before any data is available, and even if it returns true, which should result in it's being called again, if no data is written to the passed buffer, it is not called again. The only solution I've found is to block inonStdinReady()
until the first buffer is available. Which kind of defeats the purpose of using a non-blocking library for this stuff. But more seriously - and I haven't dug into the code deeply enough to determine this - if the thread pool used is the one created in the static block at the top ofLinuxProcess
, then there are only numProcessors threads available - once there are as many of these blocked as there are cores, there will be no threads to collect output.See the example code below (it uses wav2png and sox to read an audio file into 16-bit integer wav format and generate a thumbnail from it). If you remove the call to
waitForFirstWrite.await()
, no output will ever be written towav2png
.