hayes / pothos

Pothos GraphQL is library for creating GraphQL schemas in typescript using a strongly typed code first approach
https://pothos-graphql.dev
ISC License
2.34k stars 163 forks source link

question: guide on how to use smart subscriptions? #674

Closed hyusetiawan closed 1 year ago

hyusetiawan commented 1 year ago

trying to make smartsubscriptions work backed by redis, but I am stumped on how to retrigger the refetching of the data? I am following the example but it just says "retrigger" the subscription, what does that mean?

I wonder if there is an example to show how to retrigger at query/object/field level as stated in the docs?

here is my setup: builder.ts


export const builder = new SchemaBuilder<{
  Context: {
    req: NextApiRequest
    res: NextApiResponse
    session?: (Omit<Session, 'user'> & { user: User }) | null
  }
  Scalars: {
    DateTime: {
      Input: Date
      Output: Date
    }
    JSON: {
      Input: Prisma.JsonValue
      Output: Prisma.JsonValue
    }
  }
  PrismaTypes: PrismaTypes
}>({
  plugins: [
    PrismaPlugin,
    ErrorsPlugin,
    RelayPlugin,
    SimpleObjectsPlugin,
    WithInputPlugin,
    ValidationPlugin,
    SmartSubscriptionsPlugin,
  ],
  relayOptions: {
    idFieldName: 'pid',
    nodesOnConnection: true,
  },
  smartSubscriptions: {
    async subscribe(name, context, cb) {
      // should this be per user?
      console.log('NAME: ', name)
      await redisSub.subscribe(name, (...args) => {
        console.log('redisSub.subscribe', name, args)
        cb(...args)
      })
    },
    async unsubscribe(name, context) {
      console.log('redisSub.unsubscribe')
      await redisSub.unsubscribe(name)
    },
  },
  prisma: {
    client: prisma,
    // defaults to false, uses /// comments from prisma schema as descriptions
    // for object types, relations and exposed fields.
    // descriptions can be omitted by setting description to false
    exposeDescriptions: true,
    // use where clause from prismaRelatedConnection for totalCount (will true by default in next major version)
    filterConnectionTotalCount: true,
  },
})

and my query field:


builder.queryField('table', (t) =>
  t.prismaField({
    smartSubscription: true,
    subscribe(subscriptions, parent, args, context, info) {
      console.log('table query field: ', args.pid)
      subscriptions.register(`table/${args.pid as any}`, {
        filter: (bla) => {
          console.log('filter?', bla)
          return true
        },
      })
    },

    type: 'Table',
    args: {
      pid: t.arg.id({
        required: true,
      }),
    },
    nullable: true,
    resolve: async (_, __, { pid }, ctx) => {
      return prisma.table.findUnique({
        where: {
          pid: String(pid),
        },
      })
    },
  }),
)

and my mutation to trigger the update:


builder.mutationField('updateTable', (t) =>
  t.prismaFieldWithInput({
    errors: {
      types: [ZodError],
    },
    typeOptions: {
      name: 'UpdateTableInput',
    },
    input: {
      pid: t.input.id({
        required: true,
      }),
      name: t.input.string({
        required: false,
        validate: {
          schema: TableDBSchema.shape.name,
        },
      }),
    },

    type: 'Table',

    async resolve(query, _, { input }, ctx) {
      const res = await prisma.table.update({
        ...query,
        where: {
          pid: String(input.pid),
        },
        data: {
          // undefined is no op
          name: input.name == null ? undefined : input.name,
        },
      })
      console.log('redis publish: ', `table/${input.pid}`)
      await redisPub.publish(`table/${input.pid}`, '')
      return res
    },
  }),
)
hayes commented 1 year ago

I don't have a full guide, or stand-alone example, but there is an example used for testing. It is a bit complex because it needs to test a lot of use-cases and edge cases, but it may be helpful.

You can see where it publishes events to re-trigger queries here: https://github.com/hayes/pothos/blob/main/packages/plugin-smart-subscriptions/tests/example/schema/poll.ts#L161

hayes commented 1 year ago

Your existing solution looks pretty close (I don't immediately see what the issue is). I assume there is an issue currently that is causing things not to update?

I assume you have checked that publish and subscribe are being called for the same ID, does the resolver for the query field ever get called after the initial query?

hyusetiawan commented 1 year ago

I managed to get it to a working state, i want to make sure i understand the way the data flows. if I do pubsub.publish('channel', {id: 123}) where does that data of {id: 123} go to? In the case of query, does it go as args? what if it was objectField? does it go into findUnique parameter for prisma?

hayes commented 1 year ago

it will be passed to invalidateCache, and filter callbacks, but it doesn't get passed into the root resolver. The assumption is that generally event systems don't have the data granularity to properly re-resolve most graphql queries, so instead we assume that the content of an event is telling you WHAT changed, but that you will re-load the data from its source in response to the event. If the event has a lot of data, you could write the new value into something like a dataloader.

We also can't really replace/edit the arguments or context of a resolver, because you may need the original values to properly re-resolve the affected field

hyusetiawan commented 1 year ago

I see, if that is the case, given the current mechanism, could you suggest how to accomplish the following use case?

subscription {
   table(ids: [1,2,3]) {
      patches {
          op
          path
      }
   }
}

the idea is for the client to subscribe to a stream of patches, because of the client architecture requirements, I need to do some manual sync between the graphql store and a front-end state management.

in updateTable, redisPub.publish, I can publish the patch data like so:

      await redisPub.publish(`table/${input.pid}`, {op: 'replace', path: ['name'], value: 'new name'})

the only way I can think of are workarounds, is to access this data in invalidateCache, and filter like you mentioned and pass that data to the resolver somehow?

export const TableDBSchema = z.object({
  name: z.string().min(3).max(20),
})

builder.prismaObject('Table', {
  interfaces: [Node, WithAuthor],
  // subscribe(subscriptions, parent, context, info) {
  //   console.log('object subscribe: ', `table/${parent.pid}`)
  //   subscriptions.register(`table/${parent.pid}`)
  // },
  fields: (t) => ({
    name: t.exposeString('name'),
    columns: t.relation('columns', {}),
    rows: t.relation('rows', {}),
    patches: t.field({
      type: [TablePatch],
      subscribe: (subscriptions, parent) => {
        subscriptions.register(`table/patches/${parent.pid}`)
      },
      resolve(parent, args, context, info) {
        console.log('PATCH FIELD: ', args, parent)
        return [
          {
// how to get the data from the published event? or is there a better way to accomplish this?
          },
        ]
      },
    }),
  }),
})
hyusetiawan commented 1 year ago

the more i think about it, i wonder if it's advisable to use trigger the event and in the resolver, I'd read from redis data structures such as popping from a list? so it's read only once, but will this work for multiple subscriptions? Or will other subscriptions for patches would return an empty list because it's already read?

hayes commented 1 year ago

I think what you are describing is better modeled as a normal subscription.

Something like

subscription {
   tablePatches(ids: [1,2,3]) {
      tableId
      op
      path
   }
}
builder.subscriptionField('tablePatches', t => t.field({
   type: TablePatch,
   args: {
     ids: t.arg.idList()
    },
   subscribe: (_, args, ctx) => {
      // return an async iterator that iterates over each pubsub event for each id
      return subscribeToIds(args.ids)
   },
   resolve(parent) {
      return parent;
   }
}));

Your case doesn't really map well for the inteded use case of smart subscriptions which is like "live queries". Basically I have a graphql query, but I want to re-run it when something relevant gets updated. Smart subscriptions requires that you can re-load you data, and is meant for the use case of having queries that are always up-to-date, not so much for streaming a log of events. If you write your changes to your DB, the solution would be to re-query the DB in your resolver, but that doesn't really make sense here.

The above is a more traditional pattern for subscriptions, and doesn't need a plugin

hyusetiawan commented 1 year ago

I see, apologies for use-case specific questions but I am not familiar with the subscribeToIds, is this the place where I would create a listener? such as pubsub.subscribe ? how do I populate the parent in resolve?

hayes commented 1 year ago

Oh, that's just a placeholder. You would need to build an async iterator that emits your patches by subscribeing to the pubsub instance.

hyusetiawan commented 1 year ago

hmm I am not following, could you maybe provide a high level pseudo code that I can flesh out if you don't mind?

hyusetiawan commented 1 year ago

i figured it out, I am using this package: https://github.com/davidyaha/graphql-redis-subscriptions unfortunately, there is a typescript error that I am not sure how to fix (functionality wise, works fine, but am not sure what hidden issues might arise)


builder.subscriptionField('tablePatches', (t) =>
  t.field({
    subscribe: (parent, { pids }, ctx) => {
      return PubSub.asyncIterator(pids.map((pid) => `table/patches/${pid}`))
    },
    type: [TablePatch],
    args: {
      pids: t.arg.idList({
        required: true,
      }),
    },
    nullable: true,
    resolve: async (patch, { pids }, __, ctx) => {
      return [patch] as any
    },
  }),
)
Screenshot 2022-11-17 at 12 35 52 AM
hayes commented 1 year ago

nice! glad you got it figured out. That type error is related to this issue: https://github.com/apollographql/graphql-subscriptions/issues/261

hayes commented 1 year ago

I guess this is actually a different package with the same issue, opened an issue here: https://github.com/davidyaha/graphql-redis-subscriptions/issues/555