statelyai / studio-issues

Report issues found in Stately Studio
5 stars 0 forks source link

Connect to Github issues #232

Open camwest opened 2 months ago

camwest commented 2 months ago

Hey folks,

I added one of my files to github sync and it correctly picked up the primary machine "app". I added some context definitions to stately studio and tried to sync it back but the github diff didn't look appropriate.

CleanShot 2024-04-27 at 14 38 21@2x

I expected it to update the schemas object inside setup with some context definition.

It could be the way my file is structured. See below for the full file.

Thank you!

import { QueryContext } from "@/actions/types";
import { getGodClient } from "@/utils/supabase-god";
import { SupabaseClient } from "@supabase/supabase-js";
import { DataMessage } from "ai";
import { Database, encodeSqid, fetchInterviewIdTupleByFileId } from "database";
import OpenAI from "openai";
import { AssistantStream } from "openai/lib/AssistantStream";
import { assign, createActor, createMachine, fromPromise, setup } from "xstate";
import { MessageAnnotation } from "./AssistantResponse2";
import { fetchContext } from "./context";

type Message = OpenAI.Beta.Threads.ThreadCreateAndRunParams.Thread.Message;
type Run = OpenAI.Beta.Threads.Run;

// Helper function to chunk an array
function chunkArray<T>(array: T[], chunkSize: number): T[][] {
  let index = 0;
  let arrayLength = array.length;
  let tempArray: T[][] = [];

  for (index = 0; index < arrayLength; index += chunkSize) {
    let chunk: T[] = array.slice(index, index + chunkSize);
    tempArray.push(chunk);
  }

  return tempArray;
}

const OPENAI_CHAT_ASSISTANT_ID = process.env.OPENAI_CHAT_ASSISTANT_ID;

interface AIStreamOutput {
  forwardStream: (stream: AssistantStream) => Promise<Run | undefined>;
  sendDataMessage: (message: DataMessage) => void;
  onAnnotation: (
    fn: (
      index: number,
      annotations: MessageAnnotation,
    ) => Promise<{ newText: string }>,
  ) => void;
}

interface InputProps {
  context: QueryContext;
  contextId: number;
  organizationId: number;
  orgname: string;
  message: string;
}

export function createLLMActor({
  adapters,
  input,
  snapshot,
}: {
  adapters: {
    supabase: SupabaseClient<Database>;
    openai: OpenAI;
  };
  input?: InputProps;
  snapshot?: any;
}) {
  const handleInput = fromPromise<
    void,
    {
      orgname: string;
      threadId: string;
      message: string;
      output: AIStreamOutput;
    }
  >(async ({ input }) => {
    console.log("Handling input");

    const output = input.output;

    if (!OPENAI_CHAT_ASSISTANT_ID) {
      throw new Error("no OPENAI_CHAT_ASSISTANT_ID");
    }

    const runStream = adapters.openai.beta.threads.runs.stream(input.threadId, {
      assistant_id: OPENAI_CHAT_ASSISTANT_ID,
    });

    runStream.on("toolCallCreated", async (toolCall) => {
      output.sendDataMessage({
        role: "data",
        data: {
          type: "toolCall",
          toolCall: toolCall.type,
        },
      });
    });

    output.onAnnotation(async (index, annotation: MessageAnnotation) => {
      // TODO: add some caching
      const dataFromDb = await fetchInterviewIdTupleByFileId(
        // use god client since RLS doesn't allow access to openai tables
        getGodClient(),
        annotation.file_id,
      );

      if (!dataFromDb) {
        throw new Error("expected projectId and interviewId");
      }

      const [projectId, interviewId] = dataFromDb;

      const link = `/orgs/${input.orgname}/interviews/${encodeSqid([projectId, interviewId])}`;

      return {
        // disabled for now since indexes are unreliable: ?start_index=${annotation.start_index}&end_index=${annotation.end_index}
        newText: `[${index}](${link})`,
      };
    });

    console.log("Calling forwardStream with runStream:", runStream);
    // forward run status would stream message deltas
    let runResult = await output.forwardStream(runStream);
    console.log("forwardStream returned:", runResult);

    while (
      runResult?.status === "requires_action" &&
      runResult.required_action?.type === "submit_tool_outputs"
    ) {
      // WE DON'T HAVE ANY TOOLS YET
      const tool_outputs =
        runResult.required_action.submit_tool_outputs.tool_calls.map(
          (toolCall: any) => {
            console.log("toolcall", toolCall);

            let parameters;
            console.log(
              "Attempting to parse JSON: ",
              toolCall.function.arguments,
            );

            try {
              parameters = JSON.parse(toolCall.function.arguments);
            } catch (error) {
              console.error("Error parsing JSON:", error);
              // handle error as appropriate for your application
            }

            switch (toolCall.function.name) {
              default:
                throw new Error(
                  `Unknown tool call function: ${toolCall.function.name}`,
                );
            }
          },
        );

      console.log(
        "Calling submitToolOutputsStream with threadId, runResult.id, and tool_outputs:",
        input.threadId,
        runResult.id,
        tool_outputs,
      );

      runResult = await output.forwardStream(
        adapters.openai.beta.threads.runs.submitToolOutputsStream(
          input.threadId,
          runResult.id,
          {
            tool_outputs,
          },
        ),
      );
      console.log("submitToolOutputsStream returned:", runResult);
    }
  });

  const createThread = fromPromise<
    string,
    {
      context: QueryContext;
      contextId: number;
      organizationId: number;
      message: string;
    }
  >(async ({ input }) => {
    const dataContext = await fetchContext(
      adapters.supabase,
      input.context,
      input.contextId,
    );

    // check the content length and split into multiple messages if needed
    // ensure this value has at most 32768 characters
    const maxContentLength = 32768;

    const content = JSON.stringify(dataContext.context);

    const messages: Message[] = [];

    if (content.length > maxContentLength) {
      const contentChunks: string[] = [];
      const regex = new RegExp(".{1," + maxContentLength + "}", "g");
      let match;
      while ((match = regex.exec(content)) !== null) {
        contentChunks.push(match[0]);
      }
      if (!contentChunks) {
        throw new Error("no content chunks");
      }

      for (const chunk of contentChunks) {
        messages.push({
          role: "user",
          content: chunk,
          metadata: {
            type: "context",
            label: dataContext.label,
            link: dataContext.link,
          },
        });
      }
    } else {
      messages.push({
        role: "user",
        content,
        metadata: {
          type: "context",
          label: dataContext.label,
          link: dataContext.link,
        },
      });
    }

    // Chunk the contextFiles array into chunks of 10
    const fileChunks = chunkArray(dataContext.files, 10);

    // Create a new message for each chunk
    const chunkMessages: Message[] = fileChunks.map((chunk) => {
      return {
        role: "user",
        content: "here are the transcripts",
        metadata: {
          type: "context",
          label: "Transcripts",
        },
        attachments: chunk.map((fileId) => {
          return {
            file_id: fileId,
            tools: [{ type: "file_search" }],
          };
        }),
      };
    });

    console.log("Creating thread with openai");
    const resp = await adapters.openai.beta.threads.create({
      messages: [...messages, ...chunkMessages],
    });

    const threadId = resp.id;

    return threadId;
  });

  const machine = setup({
    types: {
      context: {} as {
        context: QueryContext;
        contextId: number;
        threadId: string | null;
        organizationId: number;
        orgname: string;
        message: string;
      },
      input: {} as InputProps,
      events: {} as
        | { type: "Close" }
        | { type: "New Report" }
        | { type: "Edit Report" }
        | { type: "Show Report" }
        | {
            type: "Handle Input";
            message: string;
            output: AIStreamOutput;
          }
        | { type: "Save Started" }
        | { type: "Save Completed" }
        | { type: "Load Completed" },
    },
    actors: {
      handleInput: handleInput,
      createThread: createThread,

      draftNewReport: createMachine({
        /* ... */
      }),
      editExistingReport: createMachine({
        /* ... */
      }),
    },
    schemas: {
      events: {
        Close: {
          type: "object",
          properties: {},
        },
        "New Report": {
          type: "object",
          properties: {},
        },
        "Edit Report": {
          type: "object",
          properties: {},
        },
        "Show Report": {
          type: "object",
          properties: {},
        },
        "Handle Input": {
          type: "object",
          properties: {},
        },
        "Save Started": {
          type: "object",
          properties: {},
        },
        "Save Completed": {
          type: "object",
          properties: {},
        },
        "Load Completed": {
          type: "object",
          properties: {},
        },
      },
    },
  }).createMachine({
    context: ({ input }) => ({
      context: input.context,
      contextId: input.contextId,
      organizationId: input.organizationId,
      orgname: input.orgname,
      message: input.message,
      threadId: null,
    }),
    id: "app",
    type: "parallel",
    states: {
      Chat: {
        initial: "Creating Thread",
        states: {
          "Creating Thread": {
            invoke: {
              id: "createThread",
              src: "createThread",
              input: ({ context }) => ({
                context: context.context,
                contextId: context.contextId,
                organizationId: context.organizationId,
                message: context.message,
              }),

              onDone: {
                target: "Waiting For Input",
                actions: assign({ threadId: ({ event }) => event.output }),
              },
            },
          },

          "Waiting For Input": {
            on: {
              "Handle Input": {
                target: "#app.Chat.Processing.Processing Output",
              },
            },
          },

          Processing: {
            initial: "Processing Output",
            states: {
              "Processing Output": {
                on: {
                  "New Report": {
                    target: "#app.Chat.Processing.Report Tool.Draft New Report",
                  },
                  "Edit Report": {
                    target:
                      "#app.Chat.Processing.Report Tool.Edit Existing Report",
                  },
                  "Show Report": {
                    target: "#app.Chat.Waiting For Input",
                  },
                },
                invoke: {
                  id: "handleInput",
                  src: "handleInput",
                  input: ({ context, event }) => {
                    if (context.threadId === null) {
                      throw new Error("threadId is null");
                    }

                    if (event.type === "Handle Input") {
                      return {
                        threadId: context.threadId,
                        message: event.message,
                        output: event.output,
                        orgname: context.orgname,
                      };
                    } else {
                      throw new Error("Invalid event type");
                    }
                  },
                  onDone: {
                    target: "#app.Chat.Waiting For Input",
                  },
                },
              },
              "Report Tool": {
                initial: "Draft New Report",
                states: {
                  "Draft New Report": {
                    invoke: {
                      id: "reportGenerator",
                      input: {},
                      onDone: {
                        target: "#app.Chat.Waiting For Input",
                      },
                      src: "draftNewReport",
                    },
                  },
                  "Edit Existing Report": {
                    invoke: {
                      id: "reportGenerator",
                      input: {},
                      onDone: {
                        target: "#app.Chat.Waiting For Input",
                      },
                      src: "editExistingReport",
                    },
                  },
                },
              },
            },
          },
        },
      },
      "Tool Window": {
        initial: "No Tool",
        on: {
          "New Report": {
            target: "#app.Tool Window.Report Tool.Viewing Unsaved Report",
          },
          "Show Report": {
            target: "#app.Tool Window.Report Tool.Loading Version",
          },
          "Edit Report": {
            target: "#app.Tool Window.Report Tool.Viewing Unsaved Report",
          },
        },
        states: {
          "No Tool": {},
          "Report Tool": {
            initial: "Viewing Unsaved Report",
            on: {
              Close: {
                target: "No Tool",
              },
            },
            states: {
              "Viewing Unsaved Report": {
                on: {
                  "Save Started": {
                    target: "Saving New Version",
                  },
                },
              },
              "Saving New Version": {
                on: {
                  "Save Completed": {
                    target: "Viewing Saved Report",
                  },
                },
              },
              "Viewing Saved Report": {},
              "Loading Version": {
                on: {
                  "Load Completed": {
                    target: "Viewing Saved Report",
                  },
                },
              },
            },
          },
        },
      },
    },
  });

  if (snapshot) {
    return createActor(machine, {
      input,
      snapshot,
    });
  } else {
    return createActor(machine, {
      input,
    });
  }
}
camwest commented 2 months ago

I'm also curious why the inline assign was changed.

davidkpiano commented 2 months ago

I'll investigate this; thanks for the detailed report.

camwest commented 2 months ago

I appreciate it!