knopp / msgpack_dart

MsgPack implementation for dart / msgpack.org[Dart]
MIT License
56 stars 13 forks source link

feat: serializing / deserializing over streams #13

Open SilverMira opened 1 year ago

SilverMira commented 1 year ago

Currently, this library provides no way to perform:

This PR contains:

The discrepancy between the implemented StreamTransformer interface between the two is because:

  1. Serializing arbitrary Dart objects into msgpack format already yield Uint8List, which implements List<int>, it does not make sense to downgrade the type information here.
  2. Msgpack bytes streams to deserialize can come from various sources, some exposing their interface directly as Stream<Uint8List> while some Stream<List<int>>, loosening the type constraint here should increase compatibility and avoid scenarios of Stream<List<int>>.map((e) => Uint8List.fromList(e)).
  3. ~StreamTransformer.cast() can properly recast both StreamSerializer and StreamDeserializer between Uint8List and List<int> for compatibility in both Stream<List<int>> and Stream<Uint8List> if the underlying stream elements have correct types during runtime.~ Using StreamTransformer.bind directly rather than Stream.transform allows to properly bypass using Stream.cast or StreamTransformer.cast which has performance cost

Noteworthy stream behaviors:

  1. StreamDeserializer will wait for more bytes upstream if it requires more bytes in order to complete deserialization
  2. StreamDeserializer will emit UpstreamClosedError if upstream closes unexpected (in the midst of deserializing a dart object that spans over many bytes)
  3. Both StreamSerializer and StreamDeserializer have similar constructor arguments to their non-stream counterparts.

This PR should close #6, closely relates to #10 and should be generally a better solution than exposing Deserializer.offset. Serializing Dart objects into msgpack binary and outputting into a stream is already something any user can implement on their own with the existing Serializer class, but semantics of deserializing from a byte stream or multiple Uint8List into multiple Dart objects are not possible with the current Deserializer interface.

Usage:

// Deserializing msgpack bytes from another process
Stream<List<int>> byteStream = process.stdout;
Stream<dynamic> objectStream = byteStream.transform(StreamDeserializer());
await for(final object in objectStream) {
  print(object);
}
// Deserializing chunks of Uint8List
List<Uint8List> chunks = ...;
Stream<Uint8List> byteStream = Stream.fromIterable(chunks);
// If the byte stream is of type `Stream<Uint8List>`, it is almost certain you have to use `StreamTransformer.bind` directly
Stream<dynamic> objectStream = StreamDeserializer().bind(byteStream);
List<dynamic> objectsList = await objectStream.toList();
// Serializing objects stream into msgpack bytes stream
Stream<dynamic> objectStream = ...;
Stream<Uint8List> bytesStream = objectStream.transform(StreamSerializer());
await for(final bytesChunk in bytesStream) {
  print(bytesChunk);
}
// Serializing many objects at once
List<dynamic> objects = ...;
Stream<dynamic> objectStream = Stream.fromIterable(objects);
Stream<Uint8List> bytesStream = objectStream.transform(StreamSerializer());

// To get 1-1 correspondence between input and output
List<Uint8List> bytesChunks = await bytesStream.toList(); // bytesChunk.length == objects.length

// To get M-1 correspondence between input and output
Uint8List bytes = await collectBytes(bytesStream); // collectBytes from package:async/async.dart