graphile / crystal

🔮 Graphile's Crystal Monorepo; home to Grafast, PostGraphile, pg-introspection, pg-sql2 and much more!
12.56k stars 569 forks source link

Filtering of `listen()` #2016

Open benjie opened 5 months ago

benjie commented 5 months ago

From discussion on Discord:

The main challenge I think will be coming up with the API that's suitable for everyone. But to start with, I'd copy the whole of the listen step into your own codebase, give it a new name (filteredListen or similar), then make the following changes:

  1. Accept a filter function in the constructor:

  2. Apply this filter function to the stream coming from pubsub.subscribe(topic):

pubsub.subscribe(topic) returns (potentially the promise to) an async iterable:

So you need to apply filtering to that async iterable by building a new async iterable that skips over values you don't want.

1mehal commented 5 months ago

@benjie I would give it a try

1mehal commented 5 months ago

@benjie I got it to work for SimpleSubscription plugin with following code:

 * Subscribes to the given `pubsubOrPlan` to get realtime updates on a given
 * topic (`topicOrPlan`), mapping the resulting event via the `itemPlan`
 * callback.
export class FilteredListenStep<
    TTopics extends { [topic: string]: any },
    TTopic extends keyof TTopics,
    TPayloadStep extends ExecutableStep
  extends ExecutableStep<TTopics[TTopic]>
  implements StreamableStep<TTopics[TTopic]>
  static $$export = {
    moduleName: "grafast",
    exportName: "ListenStep",
  isSyncAndSafe = true;

   * The id for the PostgreSQL context plan.
  private pubsubDep: number;

   * The plan that will tell us which topic we're subscribing to.
  private topicDep: number;

  private eventTypeDep: number;

  private eventTypePlan?: ExecutableStep<string | null>;

  // Add a new private field for the filter function
  private filterFunc:
    | ((item: TTopics[TTopic], eventType: string | null) => boolean)
    | null;

      | ExecutableStep<GrafastSubscriber<TTopics> | null>
      | GrafastSubscriber<TTopics>
      | null,
    topicOrPlan: ExecutableStep<TTopic> | string,
    public itemPlan: (itemPlan: __ItemStep<TTopics[TTopic]>) => TPayloadStep = (
    ) => $item as any,
    eventTypePlan?: ExecutableStep<string | null>,
    filterFunc?: (item: TTopics[TTopic], eventType: string | null) => boolean
  ) {
    const $topic =
      typeof topicOrPlan === "string" ? constant(topicOrPlan) : topicOrPlan;
    const $pubsub = isExecutableStep(pubsubOrPlan)
      ? pubsubOrPlan
      : constant(pubsubOrPlan, false);
    const $eventType = eventTypePlan || constant(null);
    this.pubsubDep = this.addDependency($pubsub);
    this.topicDep = this.addDependency($topic);
    this.eventTypeDep = this.addDependency($eventType);
    this.eventTypePlan = eventTypePlan;
    this.filterFunc = filterFunc || null;

  execute(): never {
    throw new Error("ListenStep cannot be executed, it can only be streamed");

    count: number,
    values: readonly [
      GrafastValuesList<string | null>
  ): GrafastResultStreamList<TTopics[TTopic]> {
    const pubsubs = values[this.pubsubDep as 0];
    const topics = values[this.topicDep as 1];
    const eventTypes = values[this.eventTypeDep as 2];
    const result = [];
    for (let i = 0; i < count; i++) {
      const pubsub = pubsubs[i];
      if (!pubsub) {
        throw new SafeError(
          "Subscription not supported",
            ? {
                hint: `${
                } did not provide a GrafastSubscriber; perhaps you forgot to add the relevant property to context?`,
            : {}
      const topic = topics[i];
      const eventType = eventTypes[i]; // Actual value of eventType
      result[i] = async function* () {
        const subscription = pubsub.subscribe(topic);
        const asyncIterable =
          subscription instanceof Promise ? await subscription : subscription;
        for await (const item of asyncIterable) {
          if (!this.filterFunc || this.filterFunc(item, eventType)) {
            yield item;
    return result;

 * Subscribes to the given `pubsubOrPlan` to get realtime updates on a given
 * topic (`topicOrPlan`), mapping the resulting event via the `itemPlan`
 * callback.
export function filteredListen<
  TTopics extends { [topic: string]: any },
  TTopic extends keyof TTopics,
  TPayloadStep extends ExecutableStep
    | ExecutableStep<GrafastSubscriber<TTopics> | null>
    | GrafastSubscriber<TTopics>
    | null,
  topicOrPlan: ExecutableStep<TTopic> | string,
  itemPlan?: (itemPlan: __ItemStep<TTopics[TTopic]>) => TPayloadStep,
  eventTypePlan?: ExecutableStep<string | null>, // Add this
  filterFunc?: (item: TTopics[TTopic], eventType: string | null) => boolean
): FilteredListenStep<TTopics, TTopic, TPayloadStep> {
  return new FilteredListenStep<TTopics, TTopic, TPayloadStep>(

const SimpleSubscriptionsPlugin = makeExtendSchemaPlugin((build) => {
  const nodeIdHandlerByTypeName = build.getNodeIdHandlerByTypeName?.();
  return {
    typeDefs: [
        extend type Subscription {
          listen(topic: String!, eventType: String): ListenPayload
        type ListenPayload {
          event: String
      ...// Only add the relatedNode if supported
        ? [
              extend type ListenPayload {
                relatedNode: Node
                relatedNodeId: ID
        : []),
    plans: {
      Subscription: {
        listen: {
          subscribePlan: EXPORTABLE(
            (context, jsonParse, lambda, listen) =>
              function subscribePlan(_$root, { $topic, $eventType = null }) {
                const $pgSubscriber = context().get("pgSubscriber");
                const $derivedTopic = lambda($topic, (topic) => {
                  return `postgraphile:${topic}`;
                const eventTypePlan = lambda(
                  (eventTypeValue) => eventTypeValue
                const filterFunction = (item: any, eventTypeValue: any) => {
                  const parsedItem = JSON.parse(item);
                  return !eventTypeValue || parsedItem.event === eventTypeValue;

                return filteredListen(
            [context, jsonParse, lambda, filteredListen]
          plan: EXPORTABLE(
            () =>
              function plan($event) {
                return $event;
      ListenPayload: {
        event($event) {
          return $event.get("event");
          ? {
              relatedNodeId($event) {
                return nodeIdFromEvent($event);
              relatedNode($event) {
                const $nodeId = nodeIdFromEvent($event);
                return node(nodeIdHandlerByTypeName, $nodeId);
          : null),