Effect-TS / effect

An ecosystem of tools to build robust applications in TypeScript
https://effect.website
MIT License
7.38k stars 231 forks source link

`PGClient.listen`, `Stream.asyncPush`, or `Stream.runForEach` bug: Stream or PG Channel silently stops working #3775

Open dmmulroy opened 5 days ago

dmmulroy commented 5 days ago

What version of Effect is running?

^3.8.4

What steps can reproduce the bug?

It's on my todo list today to attempt to reproduce in a minimal repo, but here is the code

const postgres = yield* Postgres; // wrapped around PGClient

yield* Effect.forkScoped(
  postgres.listen("job_queue_channel").pipe(
    Stream.runForEach((jobId) =>
      postgres.getJobById(jobId).pipe(
          Effect.flatten,
          Effect.andThen((job) => Message.decode(job.payload)),
          Effect.andThen((message) => [jobId, message] as const),
          Effect.andThen(handleMessage),
          Effect.onError(handleError),
        ),
      ),
  ),
);

What is the expected behavior?

The stream should process items for the pg channel indefinitely.

What do you see instead?

After some period of time, the stream stops processing items without any indication of why and without the fiber dying.

Additional information

const make = Effect.gen(function* () {
    yield* Effect.logInfo("Starting PostgresClient");

    yield* Effect.acquireRelease(Effect.logInfo("PostgresClient started"), () =>
        Effect.logInfo("PostgresClient stopped"),
    );

    const sql = yield* PgClient.PgClient;

    const getJobByIdAndLock = (jobId: string) =>
        Effect.gen(function* () {
            yield* Effect.logInfo(`Getting job '${jobId}'`);

            return yield* sql<Job>`
          update job_queue 
          set status='locked', locked_at = ${new Date()}
          where id = (
            select id from job_queue
            where id = ${jobId}
            for update skip locked
            limit 1
          )
          returning *; 
      `.pipe(Effect.map(Array.get(0)));
        });

    const setJobStatus = (jobId: string, status: Job["status"]) =>
        Effect.gen(function* () {
            yield* Effect.logInfo(`Setting status to '${status}' for job '${jobId}'`);

            if (status === "completed") {
                yield* sql<Job>`
        update job_queue set status = ${status}, completed_at = ${new Date()} where id = ${jobId};
      `;
                return;
            }

            yield* sql<Job>`
        update job_queue set status = ${status} where id = ${jobId};
      `;
        });

    return {
        getJobById: getJobByIdAndLock,
        setJobStatus,
        listen: sql.listen,
    } as const satisfies IPostgres;
});

export const PostgresClient = PgClient.layer({
    url: Config.redacted("DATABASE_URL"),
    debug: Config.succeed((...args) => {
        const [connection, query, parameters, paramTypes] = args;
        console.log(
            `PgClient debug: ${JSON.stringify({
                fn: "debug",
                connection,
                query,
                parameters,
                paramTypes,
            })}`,
        );
    }),
    onnotice: Config.succeed((notice) => {
        console.log(`PgClient onnotice: ${JSON.stringify(notice)}`);
    }),
}).pipe(Layer.annotateLogs({ module: "PgClient" }));
function handleMessage([jobId, message]: readonly [string, Message]) {
  return Effect.gen(function* () {
    yield* Effect.logInfo(`Handling job '${jobId}' of type '${message._tag}'`);

    const retryPolicy = yield* RetryPolicy;
    const postgres = yield* Postgres;

    const withRetry = Effect.retry(retryPolicy);

    const run = <A, E, R>(effect: Effect.Effect<A, E, R>) => {
      return Effect.matchEffect(withRetry(effect), {
        onFailure(cause) {
          return Effect.gen(function* () {
            yield* Effect.logError(
              `An error occurred while handling job '${jobId}' of type '${message._tag}': ${cause}`
            );

            yield* withRetry(postgres.setJobStatus(jobId, "error"));

            return yield* Effect.fail(cause);
          });
        },
        onSuccess(a) {
          return Effect.gen(function* () {
            yield* Effect.logInfo(
              `Successfully handled job '${jobId}' of type '${message._tag}'`
            );

            yield* withRetry(postgres.setJobStatus(jobId, "completed"));

            return yield* Effect.succeed(a);
          });
        }
      });
    };

    switch (message._tag) {
      case "send_verification_email": {
        return yield* Effect.forkScoped(
          run(handleSendVerificationEmail(message))
        );
      }
      case "process_metrics_upload": {
        return yield* Effect.forkScoped(
          run(handleProccesMetricsUpload(message))
        );
      }
      case "ontraport_create_contact": {
        return yield* Effect.forkScoped(
          run(handleOntraportCreateContact(message))
        );
      }
    }
  });
}
tim-smart commented 5 days ago

I'll wait for the minimal repro before going any further.

Will need to see at which point the messages are getting stuck, just to make sure it isn't the underlying postgres.js library.

dmmulroy commented 5 days ago

I've spent a ton of time digging on this and have learned more than I ever wanted to about postgres and tcp 😂 I'm inclined to believe that this is not an Effect bug at this point.

If I had to point to a potential point of failure that I wanna spend more time it would be this in @effect/sql-pg

        listen: (channel: string) =>
          Stream.asyncPush<string, SqlError>((emit) =>
            Effect.acquireRelease(
              Effect.tryPromise({
                try: () => client.listen(channel, (payload) => emit.single(payload)),
                catch: (cause) => new SqlError({ cause, message: "Failed to listen" })
              }),
              ({ unlisten }) => Effect.promise(() => unlisten())
            )
          ),

client.listen takes a third argument, onlisten that might be useful for tapping into. Under the covers this called in the following circumstances:

  1. Initial connection
  2. Reconnects

this is what the postgres npm libraries impl of listen looks like:

  async function listen(name, fn, onlisten) {
    const listener = { fn, onlisten }

    const sql = listen.sql || (listen.sql = Postgres({
      ...options,
      max: 1,
      idle_timeout: null,
      max_lifetime: null,
      fetch_types: false,
      onclose() {
        Object.entries(listen.channels).forEach(([name, { listeners }]) => {
          delete listen.channels[name]
          Promise.all(listeners.map(l => listen(name, l.fn, l.onlisten).catch(() => { /* noop */ })))
        })
      },
      onnotify(c, x) {
        c in listen.channels && listen.channels[c].listeners.forEach(l => l.fn(x))
      }
    }))

    const channels = listen.channels || (listen.channels = {})
        , exists = name in channels

    if (exists) {
      channels[name].listeners.push(listener)
      const result = await channels[name].result
      listener.onlisten && listener.onlisten()
      return { state: result.state, unlisten }
    }

    channels[name] = { result: sql`listen ${
      sql.unsafe('"' + name.replace(/"/g, '""') + '"')
    }`, listeners: [listener] }
    const result = await channels[name].result
    listener.onlisten && listener.onlisten()
    return { state: result.state, unlisten }

Specifically in this block is where reconnects happen:

 onclose() {
        Object.entries(listen.channels).forEach(([name, { listeners }]) => {
          delete listen.channels[name]
          Promise.all(listeners.map(l => listen(name, l.fn, l.onlisten).catch(() => { /* noop */ })))
        })
      },

The recursive calls for retries/reconnect seems like it should be fine w/ how y'all are using emit from Stream.asyncPush but haven't had the time to patch deps and add logging yet.

tim-smart commented 4 days ago

How frequent are the messages? And how is postgres hosted in this case?

dmmulroy commented 4 days ago

I think I've resolved the issue - the frequency of messages is very low atm due to it just being my testing. That being said, switching from using Supabases pooler/Pooled Connection to a direct connection, seems to have resolved this issue.

Still concerning that the underlying postgres library gave no indication of disconnects or errors though

tim-smart commented 4 days ago

I think I've resolved the issue - the frequency of messages is very low atm due to it just being my testing. That being said, switching from using Supabases pooler/Pooled Connection to a direct connection, seems to have resolved this issue.

Still concerning that the underlying postgres library gave no indication of disconnects or errors though

It seems they are disabling timeouts for the connection, which could be an issue too.

tim-smart commented 4 days ago

Might be a supabase issue? https://github.com/supabase/supavisor/issues/85