ReactiveX / rxdart

The Reactive Extensions for Dart
http://reactivex.io
Apache License 2.0
3.37k stars 271 forks source link

Can we have a new subject that behaves like a behavior subject but is always seeded? #572

Open kasperpeulen opened 3 years ago

kasperpeulen commented 3 years ago

BehaviorSubject is quite annoying as it is hard to get the value out of it, all because the stream could not yet have been emitted.

What if we make a new Subject that behaves like a behavior subject but is always seeded? For example a StateSubject/StateStream. Then we are sure that a StateSubject contains a nonnullable value of type T (if T is non nullable).

I use this now in my project, but I feel such a concept belongs in RxDart.

class StateStream<State> {
  StateStream({required State state})
      : _state = state,
        _streamController = StreamController.broadcast();

  final StreamController<State> _streamController;

  Stream<State> get stream => _streamController.stream;

  State _state;

  State get state => _state;

  set state(State state) {
    _streamController.add(state);
    _state = state;
  }
}
frankpepermans commented 3 years ago

We have BehaviorSubject.seeded, or do you mean renaming that one to StateSubject, or is different?

kasperpeulen commented 3 years ago

Ah, I have an outdated rxdart version which had a nullable valueWrapper, which is removed some days ago.

So it is now easier to get a value T out of a BehaviorSubject, because value is of type T. However, it is unsafe, it could throw if you forget to seed the BehaviorSubject.

So, a StateSubject could be a compile time safe version of BehaviorSubject.

hoc081098 commented 3 years ago

I have created ValueSubject in package rxdart_ext https://github.com/hoc081098/rxdart_ext/blob/master/lib/src/value/value_subject.dart

kasperpeulen commented 3 years ago

I see. I realize now also that, even if the behavior subject is seeded, the value can still be null after transformations:

import 'package:rxdart/rxdart.dart'; // from master

void main() async {
  final subject = BehaviorSubject.seeded(1);
  await subject.close();
  print(await subject.value);
  print(await subject.map((event) => event * 2).valueOrNull);
  print(await subject.where((event) => event > 2).valueOrNull);
}

// prints
1
null // you could argue that it should be 2 perhaps?
null

I guess that is why you didn't override the operators for ValueSubject? @hoc081098

frankpepermans commented 3 years ago

Value is a tricky one, it gets set after the event effectively emits, it should print 2 if the subject were a sync subject, otherwise, there's a small microdelay taking place before the transformed event emits.

There's another issue also discussing value, it seems a lot of people rely on it, but in truth it is unsafe by nature I'm afraid.

kasperpeulen commented 3 years ago

@frankpepermans I don't believe this is true, the current value is not emitted in the new BehaviorSubject after using map (or filter).

hoc081098 commented 3 years ago

Trying await subject.map(...).first

Sent from my Redmi 7A using FastHub

kasperpeulen commented 3 years ago

@hoc081098 indeed, but should it not give the value directly, when BehaviorSubject is sync?

void main() async {
  final subject = BehaviorSubject.seeded(1, sync: true);
  await subject.close();
  print(subject.valueOrNull); // 1
  var mappedSubject = subject.map((event) => event * 2);
  print(mappedSubject.runtimeType); // BehaviorSubject<int>
  print(mappedSubject.valueOrNull); // null
  print(await mappedSubject.first); // 2
  print(mappedSubject.valueOrNull); // 2
}
frankpepermans commented 3 years ago

It should yes :) But map is from dart sdk streams, it returns a MapStream, so not sure but it might not be sync, if it is sync then we need to fix that of course.

hoc081098 commented 3 years ago

It should yes :) But map is from dart sdk streams, it returns a MapStream, so not sure but it might not be sync, if it is sync then we need to fix that of course.

We already reverted this sync behavior :)) https://github.com/ReactiveX/rxdart/commit/44b2b3a8b26208334a1cdb0e063789ddbf9a183d

hoc081098 commented 3 years ago

Sync BehaviorSubject in particular as well as Sync Stream general:

The synchronous stream controller can be used to break the contract on Stream, and it must be used carefully to avoid doing so.

https://api.dart.dev/stable/2.12.2/dart-async/SynchronousStreamController-class.html

kasperpeulen commented 3 years ago

There's another issue also discussing value, it seems a lot of people rely on it, but in truth it is unsafe by nature I'm afraid.

@frankpepermans

I think people like it, is because stream seem a natural fit for state management, as state is just a value that changes over time.

I'm now experimenting with this:

class StateStream<State> extends StreamView<State> {
  StateStream(this.stream, this.state) : super(stream);
  final Stream<State> stream;
  final State state;

  StateStream<R> map<R>(R convert(State event)) {
    return StateStream(stream.map(convert), convert(state));
  }

  Future<List<State>> toList() async => [state, ...await stream.toList()];

  Future<State> get first async => state;

  StateStream<State> where(bool test(State event)) {
    return StateStream(stream.where(test), state);
  }

  StateStream<State> distinct([bool equals(State previous, State next)?]) {
    return StateStream(stream.distinct(equals), state);
  }
}

class StateSubject<State> {
  StateSubject(State state, [StreamController<State>? streamController])
      : _state = state,
        controller = streamController ?? StreamController.broadcast(sync: false);

  final StreamController<State> controller;
  StateStream<State> get stream => StateStream(controller.stream, state);

  State _state;
  State get state => _state;
  set state(State state) {
    _state = state;
    controller.add(state);
  }

  Future<void> setFromStream(Stream<State> stream) async {
    await for (var newState in stream) {
      state = newState;
    }
  }
}
void main() async {
  final subject = StateSubject(1);
  var stream = subject.stream.map((event) => event * 2);
  print(stream.state);
  stream.listen(print);
  stream.toList().then(print);
  subject.state = 2;
  subject.state = 3;
  subject.state = 4;
  await subject.controller.close();
}

// prints
// 2
// 4
// 6
// 8
// [2, 4, 6, 8]

I'm not adding the synchronous first value to listen, as it will cause double renders if used in flutter with for example hooks. The synchronous value must be manually placed as the initial value.

T useStateStream<T>(StateStream<T> stateStream) {
  final result = useState<T>(stateStream.state);
  useEffect(() => stateStream.listen((it) => result.value = it).cancel, []);
  return result.value;
}
hoc081098 commented 3 years ago

@kasperpeulen you can add it to another package (such as state_stream) and publish it to pub.dev, rather than adding it to rxdart

rubenferreira97 commented 3 years ago

I am having this problem as well, where I want an initial valid state. For fixing this I am initializing my Subjects with seeded(initialValue) and place an exclamation mark (Casting away nullability like subject.value!). Even though I know for instance that my value will not be null if it starts seeded, is not optimal as it could introduce bugs, when just a valid API contract could resolve the problem.

Can't value just return T instead of T?. I know we are trading this with the default empty constructor since the default value couldn't be null, but at least we could achieve this, and semantically (IMO) it makes more sense: final subjectString = BehaviorSubject<String?>(null) final subjectString = BehaviorSubject<String>("This string can't be null.")

When I am declaring BehaviorSubject I feel this to be odd (because dart identifies "T??" as T?):

final subjectString = BehaviorSubject<String?>() subjectString.value returns String?

final subjectString = BehaviorSubject<String>() subjectString.value returns String?

I can't be the only one to find it weird, when I specify my T as non null and his value to be nullable. Seems kinda a fake NNBD.