SandroMaglione / fpdart

Functional programming in Dart and Flutter. All the main functional programming types and patterns fully documented, tested, and with examples.
https://pub.dev/packages/fpdart
MIT License
542 stars 45 forks source link

Streams in FPDart #27

Open lloydrichards opened 3 years ago

lloydrichards commented 3 years ago

I'm just in the middle of a major refactor which involves moving a lot of my Dartz code into FPDart. While going through your articles I found how amazing the TaskEither.tryCatch() is at cleaning up my code (😙👌) .

But now I've come to my Streams and i'm drawing a bit of a blank on how to tidy them up and wonder if this is something that FPDart has a solution for? I'm currently returning Stream<Either<AlertsFailure, List<Alert>>>> in something like this:

Stream<Either<AlertsFailure, List<Alert>>> watchAlerts() async* {
    final userOption = await _auth.getSignedInUser();
    final user = userOption.getOrElse(() => throw NotAuthenticatedError());

    final alertsRef = _firestore.allUserAlerts(user.uid.getOrCrash());
    yield* alertsRef.snapshots().map((query) {
      if (query.docs.isEmpty) {
        return const Left<AlertsFailure, List<Alert>>(AlertsFailure.noAlerts());
      }
      return Right<AlertsFailure, List<Alert>>(query.docs
          .map(
            (alert) => AlertModel.fromFirestore(alert).toDomain(),
          )
          .toList()
        ..sort((a, b) => b.dateCreated.compareTo(a.dateCreated)));
    }).onErrorReturnWith((e, stack) {
      if (e is FirebaseException && e.code.contains('permission-denied')) {
        return const Left<AlertsFailure, List<Alert>>(
            AlertsFailure.insufficientPermission());
      } else {
        return const Left<AlertsFailure, List<Alert>>(
            AlertsFailure.serverError());
      }
    });
  }

i'm trying to imagine something like TaskEither to wrap the snapshot() but don't know how to handle errors from the stream. The other idea was that there might be some kind of StreamEither but no searching has brought up any examples.

This could be my FP noobish-ness or maybe this is actually a feature request... or maybe this isn't something FP even deals with and then my Stream is the right solution?

SandroMaglione commented 2 years ago

Hi @lloydrichards

Streams are a very interesting (and complex) usecase 😅

There are some patterns used in functional programming to deal with Streams. Adding them to fpdart I think would be interesting but also hard work!

Curious to hear any opinion about this 👀

lloydrichards commented 2 years ago

I would like such a feature 😁 my app uses streams nearly constantly for database reading so being able to abstract my left and right makes it much easier for my application layer to return the correct state.

Currently, I rely on RxDart to add more functionality to the stream such as the .onErrorReturnWith() which gives me the correct effect, but can maybe be packaged a little better

tim-smart commented 2 years ago

I have been working on a alternative to Stream that is more functional in nature:

https://pub.dev/packages/offset_iterator

I have recently added OffsetIterable.fromStreamEither which might be of interest! You can see it in action in this test case: https://github.com/tim-smart/offset_iterator/blob/main/packages/offset_iterator/test/offset_iterator_test.dart#L111

TheHypnoo commented 2 years ago

@SandroMaglione It would be interesting to prioritize this part, since Future and Streams are commonly used. As in the case of my application

dfdgsdfg commented 1 year ago

I think dart_either has a practical implementation.

fp_dart already has TaskEither.tryCatch which is generate ADT from Effect, so TaskEither.tryCatchStream would be have.

https://github.com/hoc081098/dart_either/blob/master/lib/src/dart_either.dart#L349-L384

  /// Transforms data events to [Right]s and error events to [Left]s.
  ///
  /// When the source stream emits a data event, the result stream will emit
  /// a [Right] wrapping that data event.
  ///
  /// When the source stream emits a error event, calling [errorMapper] with that error
  /// and the result stream will emits a [Left] wrapping the result.
  ///
  /// The done events will be forwarded.
  ///
  /// ### Example
  /// ```dart
  /// final Stream<int> s = Stream.fromIterable([1, 2, 3, 4]);
  /// final Stream<Either<Object, int>> eitherStream = Either.catchStreamError((e, s) => e, s);
  ///
  /// eitherStream.listen(print); // prints Either.Right(1), Either.Right(2),
  ///                             // Either.Right(3), Either.Right(4),
  /// ```
  ///
  /// ```dart
  /// final Stream<int> s = Stream.error(Exception());
  /// final Stream<Either<Object, int>> eitherStream = Either.catchStreamError((e, s) => e, s);
  ///
  /// eitherStream.listen(print); // prints Either.Left(Exception)
  /// ```
  static Stream<Either<L, R>> catchStreamError<L, R>(
    ErrorMapper<L> errorMapper,
    Stream<R> stream,
  ) =>
      stream.transform(
        StreamTransformer<R, Either<L, R>>.fromHandlers(
          handleData: (data, sink) => sink.add(Either.right(data)),
          handleError: (e, s, sink) =>
              sink.add(Either.left(errorMapper(e.throwIfFatal(), s))),
        ),
      );

/// Map [error] and [stackTrace] to a [T] value.
typedef ErrorMapper<T> = T Function(Object error, StackTrace stackTrace);
SAGARSURI commented 1 week ago

Any update on this?