ReactiveX / rxdart

The Reactive Extensions for Dart
http://reactivex.io
Apache License 2.0
3.36k stars 271 forks source link

Runtime type error when using StreamTransformer-based APIs #683

Closed Mike278 closed 2 years ago

Mike278 commented 2 years ago

Repro:

import 'dart:async';

import 'package:rxdart/rxdart.dart';
// import 'package:stream_transform/stream_transform.dart';

void main() {
  Stream<String?>.value(null).mapNonNull(length).listen(print);
  Stream<String?>.value('a').mapNonNull(length).listen(print);
  Stream<String>.value('a').mapNonNull(length).listen(print);

  Stream<String?>.value(null).switchMapNonNull(length$).listen(print);
  Stream<String?>.value('a').switchMapNonNull(length$).listen(print);
  Stream<String>.value('a').switchMapNonNull(length$).listen(print);
  // ^ runtime error here:
  // type 'SwitchMapStreamTransformer<String?, int?>' is not a subtype of type 'StreamTransformer<String, int?>' of 'streamTransformer'
}

int length(String s) => s.length;
Stream<int> length$(String s) => Stream.value(s.length);

Stream<R?> _mapNonNull<T extends Object, R>(
  Stream<T?> stream,
  R? Function(T) mapper,
) {
  R? f(T? x) => x == null ? null : mapper(x);
  return stream.map<R?>(f);
}

Stream<R?> _switchMapNonNull<T extends Object, R>(
  Stream<T?> stream,
  Stream<R?> Function(T) mapper,
) {
  Stream<R?> f(T? x) => x == null ? Stream.value(null) : mapper(x);
  return stream.switchMap<R?>(f);
}

extension NullableStreamExtensions<T extends Object> on Stream<T?> {
  Stream<R?> mapNonNull<R>(R? Function(T) mapper) =>
      _mapNonNull<T, R>(this, mapper);

  Stream<R?> switchMapNonNull<R>(Stream<R?> Function(T) mapper) =>
      _switchMapNonNull<T, R>(this, mapper);
}
$ dart analyze bug.dart 
Analyzing bug.dart...
No issues found!
$ dart run bug.dart
Unhandled exception:
type 'SwitchMapStreamTransformer<String?, int?>' is not a subtype of type 'StreamTransformer<String, int?>' of 'streamTransformer'
#0      Stream.transform (dart:async/stream.dart)
#1      SwitchMapExtension.switchMap (package:rxdart/src/transformers/switch_map.dart:119:7)
#2      _switchMapNonNull ([...]bug.dart:34:17)
#3      NullableStreamExtensions.switchMapNonNull ([...]bug.dart:42:7)
#4      main ([...]bug.dart:13:29)
#5      _delayEntrypointInvocation.<anonymous closure> (dart:isolate-patch/isolate_patch.dart:297:19)
#6      _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:192:12)

The error goes away if you swap the imports.

Reading up a bit on the issue indicates that this is due to Dart's covariant class generics. There are some sharp edges when it comes to classes that handle generics as both function arguments ("in" position) and return types ("out" position) or both - which is exactly what StreamTransformers do. Some relevant links:

hoc081098 commented 2 years ago

I also had this issue

Mike278 commented 2 years ago

See also: https://github.com/dart-lang/web_socket_channel/pull/65, specifically

A recent change in the Dart SDK updated Socket to implement Stream rather than Stream<List>. This forwards compatible change calls StreamTransformer.bind() rather than Stream.transform(), thus putting the stream in a covariant position and allowing for the transition to Uint8List.

Following this approach, the error goes away if I replace return stream.switchMap<R?>(f); with return SwitchMapStreamTransformer<T?, R?>(f).bind(stream);

hoc081098 commented 2 years ago

In mapNotNull and whereNotNull I have already used bind instead of transform. I will update others