GetDutchie / brick

An intuitive way to work with persistent data in Dart
https://getdutchie.github.io/brick/#/
345 stars 28 forks source link

Subscribing to Remote Supabase Events #454

Open tshedor opened 2 weeks ago

tshedor commented 2 weeks ago

A continuation of #451 and #448, this issue will track work integrating Supabase's channels with Brick. @Shreemanarjun has expressed interest in leading the initiative (but I won't hold you to it).

If you want to help develop this feature, please comment and tag me and @Shreemanarjun.

Get

From #448, this is a crude starting point with some TODOs to resolve:

mixin SubscribeWithChannels on OfflineFirstWithSupabaseRepository {
  Stream<List<TModel extends OfflineFirstWithSupabaseModel>> subscribe({
    OfflineFirstGetPolicy policy = OfflineFirstGetPolicy.localOnly,
    Query? query,
  }) {
    final adapter = supabaseProvider.modelDictionary.adapterFor[TModel]!;

    // TODO respect policy and ignore remote subscription if requested

    final channel = supabaseProvider.client
        .channel(adapter.supabaseTableName)
        .onPostgresChanges(
          event: PostgresChangeEvent.update,
          schema: 'public',
          // TODO handle filters based on the query
          table: adapter.supabaseTableName,
          callback: (data) async {
            final remoteResults = await Future.wait<TModel>(data.map(adapter.fromSupabase));
            await storeRemoteResults(remoteResults);
          }
        )
        .subscribe();

    // TODO handle channel.unsubscribe logic

    return super.subscribe(policy: policy, query: query);
  }
}

Delete

Syncing from a delete will be tricky, as a delete operation is not guaranteed to have a remote fetch. @Shreemanarjun work is an example of how to delete from a listener, however, it may be best to use the DestructiveLocalSyncFromRemoteMixin to compare local and remote state (or some version of it).

This code is untested but may be another crude starting point:

final adapter = supabaseProvider.modelDictionary.adapterFor[TModel]!;
final channel = supabaseProvider.client
  .channel(adapter.supabaseTableName)
  .onPostgresChanges(
    event: PostgresChangeEvent.delete,
    schema: 'public',
    // TODO handle filters based on the query
    table: adapter.supabaseTableName,
    callback: (data) async {
      final remoteResults = await Future.wait<TModel>(data.map(adapter.fromSupabase));
      final localResults = await sqliteProvider.get<TModel>(repository: this);
      final toDelete = localResults.where((r) => !remoteResults.contains(r));

      for (final deletableModel in toDelete) {
        await sqliteProvider.delete<TModel>(deletableModel);
        memoryCacheProvider.delete<TModel>(deletableModel);
      }
    },
  ).subscribe();

// TODO handle channel.unsubscribe logic
Josiassejod1 commented 1 week ago

@tshedor I am interested in picking this up.

tshedor commented 1 week ago

@Josiassejod1 Welcome aboard, happy to have you.


For the #subscribe mixin, I had a thought last night about overriding it fully to resolve the unsubscribe issue:

mixin SubscribeWithChannels on OfflineFirstWithSupabaseRepository {
  @override
  Stream<List<TModel extends OfflineFirstWithSupabaseModel>> subscribe({
    OfflineFirstGetPolicy policy = OfflineFirstGetPolicy.localOnly,
    Query? query,
  }) {    
    query ??= Query();
    if (subscriptions[TModel]?[query] != null) {
      return subscriptions[TModel]![query]!.stream as Stream<List<TModel>>;
    }

    final adapter = supabaseProvider.modelDictionary.adapterFor[TModel]!;

    if (policy == OfflineFirstGetPolicy.localOnly) {
      return super.subscribe<TModel>(policy: policy, query: query);
    }

    final channel = supabaseProvider.client
        .channel(adapter.supabaseTableName)
        .onPostgresChanges(
          // TODO accept different Postgres events
          event: PostgresChangeEvent.update,
          schema: 'public',
          // TODO handle filters based on the query
          table: adapter.supabaseTableName,
          callback: (data) async {
            final instance = await adapter.fromSupabase(data);
            await storeRemoteResults(instance);
          }
        )
        .subscribe();

    // Should the logic be duplicated from the super class
    // or should there be a protected method to build a stream controller 
    // in the super class? 
    final controller = StreamController<List<TModel>>(
      onCancel: () async {
        await channel.unsubscribe();
        await subscriptions[TModel]?[query]?.close();
        subscriptions[TModel]?.remove(query);
        if (subscriptions[TModel]?.isEmpty ?? false) {
          subscriptions.remove(TModel);
        }
      },
    );

    subscriptions[TModel] ??= {};
    subscriptions[TModel]?[query] = controller;

    // ignore: discarded_futures
    get<TModel>(query: query, policy: policy).then((results) {
      if (!controller.isClosed) controller.add(results);
    });

    return controller.stream;
  }
}

Also, streaming will need to be added to the SupabaseMockServer. The Supabase team tests the realtime feature in their client and some of that code could be recycled.

jtkeyva commented 1 week ago

They did a good job of syncing in this example: https://github.com/SkillDevs/electric_dart/tree/master/todos_flutter

alterhuman commented 4 days ago

@tshedor sorry for an update comment but I am planning to integrate Brick & can't continue if deletion from remote side doesn't update the client side as well in the future.

Just wanted to know if it will be possible in the future based on what you've tried, so that I can integrate with peace of mind. Sorry again for an update comment.

tshedor commented 4 days ago

@Josiassejod1 @Shreemanarjun any updates?

Shreemanarjun commented 2 days ago

Sorry for late reply. i solved this way.

import 'dart:async';
import 'package:brick_offline_first/brick_offline_first.dart';
import 'package:brick_offline_first_with_supabase/brick_offline_first_with_supabase.dart';
import 'package:supabase_flutter/supabase_flutter.dart';

mixin SubscribeWithChannels on OfflineFirstWithSupabaseRepository {
  @override
  Stream<List<TModel>> subscribe<TModel extends OfflineFirstWithSupabaseModel>({
    OfflineFirstGetPolicy policy = OfflineFirstGetPolicy.localOnly,
    Query? query,
  }) {
    query ??= Query();
    if (subscriptions[TModel]?[query] != null) {
      return subscriptions[TModel]![query]!.stream as Stream<List<TModel>>;
    }

    final adapter = remoteProvider.modelDictionary.adapterFor[TModel]!;

    if (policy == OfflineFirstGetPolicy.localOnly) {
      return super.subscribe<TModel>(policy: policy, query: query);
    }

    final channel = remoteProvider.client
        .channel(adapter.supabaseTableName)
        .onPostgresChanges(
          event: PostgresChangeEvent.all,
          schema: 'public',
          table: adapter.supabaseTableName,
          callback: (payload) async {
            final event = payload.eventType;
            final record = payload.newRecord;

            switch (event) {
              case PostgresChangeEvent.insert:
              case PostgresChangeEvent.update:
                final instance = await adapter.fromSupabase(record,
                    provider: remoteProvider);
                await upsert<TModel>(
                  instance as TModel,
                );
                break;
              case PostgresChangeEvent.delete:
                final instance = await adapter.fromSupabase(record,
                    provider: remoteProvider);
                await delete<TModel>(instance as TModel);
                break;
              default:
            }
          },
        )
        .subscribe();

    final controller = StreamController<List<TModel>>(
      onCancel: () async {
        await channel.unsubscribe();
        await subscriptions[TModel]?[query]?.close();
        subscriptions[TModel]?.remove(query);
        if (subscriptions[TModel]?.isEmpty ?? false) {
          subscriptions.remove(TModel);
        }
      },
    );

    subscriptions[TModel] ??= {};
    subscriptions[TModel]?[query] = controller;

    // Fetch initial data
    get<TModel>(query: query, policy: policy).then((results) {
      if (!controller.isClosed) controller.add(results);
    });

    return controller.stream;
  }
}
tshedor commented 2 days ago

@Shreemanarjun will you make tests or a PR?

jtkeyva commented 2 days ago

@Shreemanarjun is that a realtime only solution?

jtkeyva commented 2 days ago

@tshedor any chance on implementing this for non-realtime? i was about to commit to using Brick but this is a major roadblock making it essentially unusable

tshedor commented 2 days ago

@jtkeyva this is open source software. PRs from anyone are welcome at anytime

alterhuman commented 1 day ago

@tshedor any chance on implementing this for non-realtime? i was about to commit to using Brick but this is a major roadblock making it essentially unusable

I don't think this is possible without realtime. Even external services require either real time or logical replication to work. Do you have a possible approach without these?