ParabolInc / parabol

Free online agile retrospective meeting tool
https://www.parabol.co/
Other
1.91k stars 330 forks source link

Embeddings: Related Discussions, Priority #9563

Closed mattkrick closed 4 months ago

mattkrick commented 7 months ago

Our embedder uses a CPU and will take 2-3 seconds per discussion. This means it probably won't be able to keep up, so we have to prioritize which items we want to process first.

Today, when a meeting ends we ping the embedder & tell it to embed all the discussions. The related discussions PR pings the embedder when the voting stage ends & embeds all the discussions, then finds similar discussions & embeds those while leaving out the header that describes the meeting O(nm).

To efficiently implement something like this, we can't embed on the fly. We'll want to persist the reranked embeddings so we don't embed the same doc over & over.

We also need a way to support job of jobs. We can't process related discussions until we know all the previous discussions have been embedded. With multiple models, we'll need to query the job queue table to see if any jobs are still outstanding for that team. Do we add teamId to the job queue to get this?

Flow:

These dependencies between jobs are called job chaining, workflows, or just flows. This case is somewhat special because we don't know the parent task ahead of time. For example, we could run an ad-hoc embedding job & now any parent job (existing or new) should wait until that child job is finished.

So, we either need a separate job queue, or we need our current job queue to be polymorphic. In either case, we'll need a state machine and some more states that can handle each state. A single queue is probably best because then we don't have competing priorities.

New flow:

Changes to implement:

mattkrick commented 6 months ago

the problem with the p+1 prioritization strategy is we'll never start generating retro similarities until ALL the discussions in the whole SaaS are processed. That will tank our QoS as measured by average time to completion. Instead, we want to prioritize by meetingId or discussionId.

the goal is to do this without promises that could take up to minutes to resolve, so e.g. having a super job that does everything sequentially isn't ideal. superjobs are also bad because the errors should be tied to a specific metadataId, not a group of them. we also don't want jobs to run longer than a couple seconds so the server stays stateless & can handle restarts easily.

ideally, what we want is a way to prioritize these flow jobs after their children, but before other, newer jobs of the same type. so, we'll need a 2nd sort order:

we'll need to orderBy(priority asc, flowStartAt asc). then, for the aggregate job, we just need to make sure it's at least 1ms later than the children to ensure that all children have been processed.

we may also need a flowId attached to the children in case 1 errors we can error the whole flow, but for related discussions, if 1 errors we can just ignore that.

TODO: refId is not globally unique. add a table prefix?

Dschoordsch commented 6 months ago

If I understand this correctly, we only need 2 to 3 different priorities

mattkrick commented 6 months ago

that's true! the only thing missing is a signal to say "generated embeddings for meeting X are done, now calculate similarities". or after that "similarities have been generated, now rerank"

My current plan is to signal that with a "generateSimilarities" job that comes after the embeddings have been created. Given we only sort by priority, if it has p-1, then we can't guarantee that all embeddings have been created, but if it has a greater priority than the embeddings then "generateSimilartiies" won't run until there are no more embeddings to process, which could take too long! Open to other suggestions!

mattkrick commented 6 months ago

Building this out keeps feeling like it's getting more complex, not less, so I think I'm going down the wrong path. The problem arises with the interdependencies between jobs & how 1 job failure should trigger a failure for all future jobs within the same workflow.

I spent some time looking around for best practices & the newest job queue is called hatchet: https://docs.hatchet.run/home/basics/workflows.

Using hatchet would require another docker image & it is built on top of prisma & it has very complex internals: https://github.com/hatchet-dev/hatchet/blob/7ab7290eece1018197d7856740ea46dc7d74eaae/internal/repository/prisma/dbsqlc/step_runs.sql#L420. I think we can get away with keeping the queue in a single table, but we can borrow greatly from the API.

The thing I like most about hatchet is the concept of 1 workflow having 1 or many steps. To emulate this, we can have each row in the job queue be a workflow & the status of the job queue could be the step that it's on. Our use case is a little unique because some steps lead to n next steps: e.g. a "relatedDiscussions:start" step could lead to 10 "embed" processes, 1 for each discussion. How we could approach this is by having each workflow have a column called maxWorkersForStep and currentWorkersForStep. if we are on the embed step, which has 10 jobs to do, we could include all the discussionIds in the context. Then, for each worker that picks up that job, we increment the currentWorkersForStep until it equals maxWorkersForStep.

The problem with this is the waterfall nature-- 1 step must be completed until moving onto the next. While we can do 2 jobs within the same step, we can't do 2 distinct steps at once. In other words, we have to wait for all 10 embed jobs to finish until moving onto the next step, even if the next step doesn't depend on those embed jobs! We need a way to support truly directed acyclic graphs. where node B and C both depend on A, and node D depends on both B and C, allowing B & C to run concurrently.

Rereading hatchet, I'm not sure if they actually support DAGs because their nodes never bifurcate in the example. If we did want to support it, and only use 1 table, we'd need each step to return an array of children steps and then have a stepCounter instead of a currentWorkers where the stepCounter pointed to an index in an array of steps.

so, relatedDiscussions job comes in:

job = {
  workflow: 'relatedDiscussions',
  stepCount: 0,
  maxSteps: 1,
  steps: [{name: 'start', meetingId: 123}]
}

since stepCount < maxSteps, the job gets picked up, the stepCount gets incremented, and the job gets returned to a worker. the worker picks steps stepCount - 1 and performs the job using the worker for start with meetingId as the context. The output from start updates the job row to the following

job = {
  workflow: 'relatedDiscussions',
  stepCount: 1,
  maxSteps: 11,
  steps: [{name: 'start', meetingId: 123}, {name: 'embed', discussionId: 'abc'}, {name: 'embed', discussionId: 'def'}...{name: 'embed', discussionId: 'efg', isFinal: true}]
}

where maxSteps === steps.length. so, the workers go along, picking this workflow until stepCount == maxSteps. when the worker with isFinal === true completes it's job, it adds the getSimilarities job. Ideally, we don't want this logic to exist in the worker callback itself, but rather in an orchestrator. The problem is that what if each of n embed jobs results in n getSimilarities jobs? does it matter that all the embeds have to finish before the first getSimilarities can start? Ideally, we process each discussion to completion so the user gets feedback ASAP. Since we can't splice that job into the steps array, we'll need to push it, but assign a priority to it that is the same priority of its parent step... this is looking like a 2nd DB table 😦 If we do that, then we may as well ditch the workflows table & just have a steps table, which is what we do today. the problem is how we fail jobs & propagate that failure to future jobs.

If we did have 2 tables, we'd need a workflow queue table & a stepQueue table, which is what hatchet does. after picking a workflow and incrementing the stepCount, we'd have to pick a step from the step queue table, based on priority. we probably need some concept of parent/child relationship to exist to know which jobs can start immediately & which ones need to wait for something else.

I think we can keep 1 table if we keep some rules:

jordanh commented 5 months ago

@mattkrick safe to close now?

mattkrick commented 4 months ago

yep, completed