honojs / middleware

monorepo for Hono third-party middleware/helpers/wrappers
https://hono.dev
451 stars 164 forks source link

net::ERR_INCOMPLETE_CHUNKED_ENCODING when using tRPC subscriptions and the Hono trpc-server/ #717

Open samuelgoldenbaum opened 2 months ago

samuelgoldenbaum commented 2 months ago

tRPC subscriptions throw a net::ERR_INCOMPLETE_CHUNKED_ENCODING 200 (OK) error when using an httpSubscriptionLink that uses Server-sent Events (SSE) for subscriptions

image

This causes the client to continuously fire off new subscription calls.

This seems restricted to subscriptions so far as mutations and queries seem fine.

server.ts:

import { initTRPC } from '@trpc/server'
import { cors } from 'hono/cors'
import { trpcServer } from '@hono/trpc-server'
import { z } from 'zod'
import { EventEmitter, on } from 'events'
import { randomUUID } from 'crypto'
import superjson from 'superjson'
import { Hono } from 'hono'
import { EVENT, Widget } from '../common'

const t = initTRPC.create({
  transformer: superjson
})
const eventEmitter = new EventEmitter()

const publicProcedure = t.procedure
const router = t.router

const appRouter = router({
  create: publicProcedure
    .input(
      z.object({
        name: z.string()
      })
    )
    .mutation(({ input }) => {
      const widget: Widget = {
        ...input,
        id: randomUUID(),
        createdAt: new Date().toDateString()
      } satisfies Widget

      eventEmitter.emit(EVENT.CREATE, widget)
    }),
  onCreate: publicProcedure.subscription(async function* (opts) {
    for await (const [data] of on(eventEmitter, EVENT.CREATE)) {
      const widget = data as Widget
      yield widget
    }
  })
})

export type AppRouter = typeof appRouter

const app = new Hono().use(cors()).use(
  '*',
  trpcServer({
    router: appRouter
  })
)

export default {
  port: 3001,
  fetch: app.fetch
}

client: trpc.ts

import {
  createTRPCClient,
  httpBatchLink,
  loggerLink,
  splitLink,
  unstable_httpSubscriptionLink
} from '@trpc/client'
import { AppRouter } from './../../hono-server'
import superjson from 'superjson'

const url = 'http://localhost:3001/trpc'

export const trpc = createTRPCClient<AppRouter>({
  links: [
    loggerLink(),
    splitLink({
      condition: (op) => op.type === 'subscription',
      true: unstable_httpSubscriptionLink({
        url,
        transformer: superjson
      }),
      false: httpBatchLink({
        url,
        transformer: superjson
      })
    })
  ]
})

App.tsx

import { trpc } from './tprc'
import React, { useEffect, useState } from 'react'
import './App.css'
import { faker } from '@faker-js/faker'
import { Widget } from '../../common'

function App() {
  const [widgets, setWidgets] = useState<Widget[]>([])

  useEffect(() => {
    trpc.onCreate.subscribe(undefined, {
      onData: (data) => {
        setWidgets((widgets) => [...widgets, data])
      },
      onError: (err) => {
        console.error('subscribe error', err)
      }
    })
  }, [])

  return (
    <div className="App">
      <header className="App-header">Widgets</header>

      <button
        onClick={() => {
          trpc.create.mutate({ name: faker.commerce.productName() })
        }}
      >
        Create Widget
      </button>
      <hr />
      <ul>
        {widgets.map((widget) => (
          <li key={widget.id}>{widget.name}</li>
        ))}
      </ul>
    </div>
  )
}

export default App

Running the same code using node HTTP server seems fine:

import { initTRPC } from '@trpc/server'
import { createHTTPServer } from '@trpc/server/adapters/standalone'
import cors from 'cors'
import { z } from 'zod'
import { EventEmitter, on } from 'events'
import { randomUUID } from 'crypto'
import superjson from 'superjson'
import { EVENT, Widget } from '../common'

const t = initTRPC.create({
  transformer: superjson
})
const eventEmitter = new EventEmitter()

const publicProcedure = t.procedure
const router = t.router

const appRouter = router({
  create: publicProcedure
    .input(
      z.object({
        name: z.string()
      })
    )
    .mutation(({ input }) => {
      const widget: Widget = {
        ...input,
        id: randomUUID(),
        createdAt: new Date().toDateString()
      } satisfies Widget

      eventEmitter.emit(EVENT.CREATE, widget)
    }),
  onCreate: publicProcedure.subscription(async function* (opts) {
    for await (const [data] of on(eventEmitter, EVENT.CREATE)) {
      const widget = data as Widget
      yield widget
    }
  })
})

export type AppRouter = typeof appRouter

// create server
createHTTPServer({
  middleware: cors(),
  router: appRouter
}).listen(3000)
samuelgoldenbaum commented 2 months ago

Add a repo here to reproduce