dart-lang / sdk

The Dart SDK, including the VM, JS and Wasm compilers, analysis, core libraries, and more.
https://dart.dev
BSD 3-Clause "New" or "Revised" License
10.23k stars 1.57k forks source link

Dart should support anonymous pipes #49917

Closed brianquinlan closed 2 years ago

brianquinlan commented 2 years ago

Dart should support anonymous pipes (e.g. as returned by the pipe system call).

Requested by @robert-ancell in #47310.

brianquinlan commented 2 years ago

How does this look as an API?

/// An `AnonymousPipe` holds two [RandomAccessFile]s that each allow unidirectional data flow
/// i.e. data written to [writePipe] can be read from [readPipe]. Creating an [AnonymousPipe] is
/// possible on Windows but, since there is no way to transmit it to another process, probably not
/// useful.
///
/// Either the [readPipe] or [writePipe] can be transmitted to a another process and used for interprocess
/// communication.
///
/// For example:
///
/// ```dart
/// import 'dart:async';
/// import 'dart:io';
///
/// void main() async {
///   final address =
///      InternetAddress('sock', type: InternetAddressType.unix);
///   final socket = await RawSocket.connect(address, 0);
///.  final pipe = await AnonymousPipe.create();
///   
///   await socket.sendMessage(<SocketControlMessage>[
///        SocketControlMessage.fromHandles(
///            <ResourceHandle>[ResourceHandle.fromFile(pipe.readPipe)])
///      ], 'Hi'.codeUnits);
///   await pipe.write.writeString("Message");
/// }
/// ```
abstract class AnonymousPipe {
  RandomAccessFile get readPipe;
  RandomAccessFile get writePipe;

  static Future<AnonymousPipe> create();
  static AnonymousPipe createSync();
}

I'd actual prefer to not create a new class for this. Does anyone have an alternative API suggestion?

lrhn commented 2 years ago

Wait for records and use ({RandomAccessFile in, RandomAccessFile out})? :wink:

For naming, why is it Anonymous? Do we expect a NamedPipe too? The Pipe on the getters also seem unnecessary.

Consider:

abstract class Pipe {
  RandomAccessFile get read; // Would use "in" and "out", but "in" is a reserved word.
  RandomAccessFile get write;
  external factory Pipe.createSync();
  externals static Pipe create();
}

Is RandomAccessFile the best abstraction? It seems to assume being backed by a single file system file, since you can ask for the length. I guess that can be simulated by providing "bytes written/received so far". What happens if you read from the write pipe, or write to the read pipe? Do you need the distinction between a synchronous and an asynchronous write?

Even more alternatively, could it be a single RandomAccessFile, where reading and writing acts like the different ends of the same pipe. If you want to give the file to someone else who should only read or write, you can wrap it in a "read-only"/"write-only" wrapper first.

Also consider a (Sink<Uint8List>, Stream<Uint8List>) pair instead. That way the two ends are limited to the in or out behaviors You can build abstractions on top of the Sink, and the Stream is directly supported by the languge, but you lose the ability to do synchronous read operations.

a-siva commented 2 years ago

In the example above socket.sendMessage is used to communicate one end of the pipe to another process, but this does not work on Windows, what would be the option on Windows ?

Also @rmacnak-google points out that anonymous pipes created on Windows do not support non blocking IO (this maybe an issue for dart:io)

brianquinlan commented 2 years ago

@lrhn

Thanks for the feedback!

Wait for records and use ({RandomAccessFile in, RandomAccessFile out})? 😉

I was thinking about it 😆

Is RandomAccessFile the best abstraction? It seems to assume being backed by a single file system file, since you can ask for the length.

I'm not sure what will happen on Windows but on Unix, it should return the number of bytes left to be read.

I guess that can be simulated by providing "bytes written/received so far". What happens if you read from the write pipe, or write to the read pipe?

You get FileSystemException: writeFrom failed, path = '' (OS Error: Bad file descriptor, errno = 9) but that is consistent with doing something like:

f = await File('something').open(); // read is the default mode
await f.writeString('Hi'); // Same exception as above.

Do you need the distinction between a synchronous and an asynchronous write?

You mean for the create methods? We have that for File so I thought that I'd be consistent.

Even more alternatively, could it be a single RandomAccessFile, where reading and writing acts like the different ends of the same pipe. If you want to give the file to someone else who should only read or write, you can wrap it in a "read-only"/"write-only" wrapper first.

Also consider a (Sink<Uint8List>, Stream<Uint8List>) pair instead. That way the two ends are limited to the in or out behaviors You can build abstractions on top of the Sink, and the Stream is directly supported by the languge, but you lose the ability to do synchronous read operations.

I'd rather not introduce a new Sink subclass and a new Stream subclass.

brianquinlan commented 2 years ago

In the example above socket.sendMessage is used to communicate one end of the pipe to another process, but this does not work on Windows, what would be the option on Windows ?

Yeah, I don't think that being able to create pipes would be useful on Windows.

Also @rmacnak-google points out that anonymous pipes created on Windows do not support non blocking IO (this maybe an issue for dart:io)

Isn't dart:io file io synchronous at the system call level?

brianquinlan commented 2 years ago

@aam, @rmacnak-google seem to also prefer the Stream<Uint8List>, Sink<Uint8List> pair approach. I think that would end up looking like this:

abstract class ReadPipe implements Stream<Uint8List> {
  Future<int> unreadBytes(); // The number of unread bytes.
}

abstract class WritePipe implements Sink<Uint8List> {
}

abstract class Pipe {
  ReadPipe get read;  // Can't just be a Stream<Uint8List> because that won't work with ResourceHandle
  WritePipe get write; // Can't just be a Sink<Uint8List> because that won't work with ResourceHandle
  factory Pipe.createSync();
  static Future<Pipe> create();
}

// ResourceHandle needs two new methods to extract file descriptors from read/write pipes:
ResourceHandle.fromReadPipe(ReadPipe pipe);
ResourceHandle.fromWritePipe(WritePipe pipe);

IMO: This seems pretty heavy - we need two need classes and two new methods in ResourceHandle.

But I'm happy to go with that if it is more idiomatic.

robert-ancell commented 2 years ago

Some of the proposed API is using the term 'pipe' for the pipe ends - this will be confusing. I suggest something like:

abstract class Pipe {
   RandomAccessFile reaEnd;
   RandomAccessFile writeEnd;
}

or

abstract class Pipe {
   PipeInput input;
   PipeOutput output;
}
brianquinlan commented 2 years ago

@robert-ancell Ack. We won't do that.

@aam @rmacnak-google @lrhn : I implemented the proposal where the read end of the pipe is a Stream. The issue is that the Stream will be exhausted when there is no more data to read. That doesn't match my expectations on how people typically use pipes i.e. pipes are infinite streams and there should be a way to wait until new data appears.

UPDATE: I think that I can fix this behavior without changing the interface (which is described below)

The exact interface looked like:

abstract class ReadPipe implements Stream<List<int>> {
//  Future<int> unreadBytes(); // The number of unread bytes.
}

abstract class WritePipe implements IOSink {
}

abstract class Pipe {
  ReadPipe get read;
  WritePipe get write;

  static Future<Pipe> create() {
    return _Pipe._create();
  }
}

My test code was:

void main() async {
  final pipe = await Pipe.create();
 pipe.read.listen((event) {
    print(event);
  }) // onDone is triggered immediately because no data was written to the pipe.
}

RandomAccessFile does not have this problem because you can just keep calling a read method until data arrives.

Options:

  1. switch back to read and write being RandomAccessFile instances.
  2. add a close method to ReadPipe and only trigger onDone when it is called.
brianquinlan commented 2 years ago

Fixed in 2ead86ab9dcbfe9217af01670c7d93ede73381c9