grammyjs / conversations

Conversational interfaces for grammY.
https://grammy.dev/plugins/conversations
MIT License
52 stars 17 forks source link

feat: external events #19

Open KnorpelSenf opened 2 years ago

KnorpelSenf commented 2 years ago

It may be interesting to be able to wait for external events, rather than just waiting for Telegram updates.

For example, we could add something like await conversation.waitExternal('id') and then in turn people can do

const session = await storage.read(chatId)
await fireEvent('id', conversation, session)

which loads the right session data, and runs the supplied conversation function.

swim2sun commented 1 month ago

Hello @KnorpelSenf , I am looking forward to using the new features in the engine branch. However, I am facing some challenges due to the lack of documentation, and I am unable to find the corresponding test cases. As a result, I am unsure how to use the new API. I also tried searching for "fireEvent" and "waitExternal" in the repository but couldn't find any results. Could you please provide a brief introduction to the relevant APIs? Thank you!

KnorpelSenf commented 1 month ago

I am going to document this more thoroughly in the coming days and weeks. Here is a relevant test case: https://github.com/grammyjs/conversations/blob/6a543d07be7c1db02ffc4d144804123d0103b58d/test/conversation.test.ts#L36-L59 The naming in the above description is not used in the implementation. You can enter a conversation which gives you some state, and then you can pass this state when resuming a conversation.

You can view an HTML overview of the plugin's API surface at https://doc.deno.land/https://raw.githubusercontent.com/grammyjs/conversations/refs/heads/engine/src/mod.ts. It says Deno everywhere because they provide the tooling to create such a page, but the plugin works identically on Node.js.

Instead of fireEvent you will have to take your event data and wrap it inside a Context object for now. That is not ideal, and the TypeScript types will be improved here, but it should work well apart from a type-cast. You can then access the event data inside the conversation using

const event = await conversation.wait()
const data = event.update as MyEventData

LMK if you have further questions, and apologies for the rough DX. After all, it's not released yet.

swim2sun commented 1 month ago

@KnorpelSenf Thank you for your quick and detailed response. However, I'm still a bit confused about the usage of the new API. I'm new to grammY, so please forgive me if my understanding of the technical details is incorrect:

  1. When and where should we call enterConversation and resumeConversation? Is it within a conversation? Wouldn't that create nested conversations? Or should these be called outside of a conversation, and if so, how do we access the context?

  2. My current understanding is that after calling ctx.conversation.enter('conv-id') to enter a conversation, grammY takes control of the conversation. When a new update arrives, the plugin resumes from where it previously waited. Now, does conversation.wait() resume the conversation both when a new update arrives and when resumeConversation is called?

  3. Let's discuss a specific example. In my scenario, I need to poll a database for a certain status halfway through a conversation, then continue returning information to the user. I initially tried to implement this using a while loop with conversation.sleep. The code looked something like this:

const composer = new Composer<MyContext>();

async function generateImage(conversation: MyConversation, ctx: MyContext) {
  conversation.run(hydrate());
  const modelKeyboard = new InlineKeyboard()
    .text(ctx.t("model-sd"), "sd")
    .text(ctx.t("model-flux"), "flux");
  await ctx.reply(
    ctx.t("pls-select-model"),
    { reply_markup: modelKeyboard }
  );
  const modelCtx = await conversation.waitForCallbackQuery(["sd", "flux"] , {
    otherwise: (ctx) => ctx.reply(ctx.t("select-model"), { reply_markup: modelKeyboard}),
  });
  await modelCtx.answerCallbackQuery();
  const model = modelCtx.match;
  // prompt
  // similar code to get prompt
  const prompt = ...;
  let task: ImageTask = await conversation.external(async () => await prisma.imageTask.create({
    data: {
      model: model.toString(),
      prompt: prompt.toString(),
    },
  }));
  console.log("task created: " + task.id);
  await ctx.reply(ctx.t("task-created"));
  let secs = 0;
  while (task.status !== 'SUCCESS') {
    const statusMessage = await ctx.reply("waiting " + secs + "s");
    await conversation.sleep(1000); 
    task = await conversation.external(async () => await prisma.imageTask.findUniqueOrThrow({where: { id: task.id }}));
  }I
  await ctx.reply("success");
}

composer.use(createConversation(generateImage));
composer.command("generate-image", (ctx) => ctx.conversation.enter("generateImage"));

However, this turned out to be an incorrect usage, as the program reported a webhook timeout error. How can this code be rewritten using the new features?

swim2sun commented 1 month ago

@KnorpelSenf Hello again, after reviewing the source code, I've noticed that both enterConversation and resumeConversation are currently called within middleware. This means these methods are only triggered when an update arrives.

  1. Let's say I implement a custom middleware that queries a database to determine whether to resume a conversation. Is it correct to assume that this code would only execute when an update arrives? If we query only once, it would happen only when an update comes in. On the other hand, if we check in a loop, we might run into webhook timeout issues. Is my understanding correct?

  2. Alternatively, if I set up an external scheduled task to check the database, I face a new challenge: How should I construct the Context when calling resumeConversation?

KnorpelSenf commented 1 month ago

Got a bit much to do right now, let me get back to you this weekend, sorry for the delay

KnorpelSenf commented 1 month ago

Maybe as a very short (and probably too short) hint, you'll have to provide a storage adapter and then query that storage yourself in order to obtain conversation state. This state can in turn be passed to resumeConversation. I'll follow up with a better example in a few days.

swim2sun commented 1 month ago

@KnorpelSenf There's no rush at all, please take your time with your other tasks. If possible, a concrete example would be very helpful when you have a chance to provide one, thanks.

KnorpelSenf commented 1 month ago

I have pushed a few commits that make this case easier. You'll need to pull a new version.

This is how you can manually load state and use resumeConversation to resume a conversation.

type MyContext = ConversationFlavor<Context>;
type MyConversationContext = Context;

type MyConversation = Conversation<MyConversationContext>;

const bot = new Bot<MyContext>("redacted");

const version = 0;
const fileAdapter = new FileAdapter<VersionedState<ConversationData>>({
    dirName: "/tmp",
});

bot.use(
    conversations({
        storage: {
            type: "key",
            version,
            getStorageKey: (ctx) => ctx.chatId?.toString(),
            adapter: fileAdapter,
        },
    }),
);
bot.command(
    "active",
    (ctx) => ctx.reply(JSON.stringify(ctx.conversation.active(), null, 2)),
);

interface MyEvent {
    type: "event";

    foo: string;
    bar: number;
}
async function convo(conversation: MyConversation, ctx: MyConversationContext) {
    const { update } = await conversation.waitUntil((ctx) =>
        // only wait for external events
        "type" in ctx.update && ctx.update.type === "event"
    );
    const event = update as unknown as MyEvent; // cast back from update until plugin is improved
    await ctx.reply(`Received ${event.foo}`);
}
bot.use(createConversation(convo));
bot.command("start", (ctx) => ctx.conversation.enter("convo"));

const { versionify, unpack } = pinVersion(version);
async function supplyExternalEvent(chat_id: number, event: MyEvent) {
    // fetch data
    const key = chat_id.toString();
    const data = await fileAdapter.read(key);
    const state = unpack(data);
    if (state === undefined) return; // bad or missing data for chat_id
    const convoState = state.convo?.[0];
    if (convoState === undefined) return; // convo not entered
    const baseData = {
        update: event as unknown as Update, // cast to update until plugin is improved
        api: bot.api,
        me: bot.botInfo,
    };
    // run conversation
    const res = await resumeConversation(convo, baseData, convoState);
    // handle result
    switch (res.status) {
        case "skipped":
            return;
        case "complete":
        case "error":
            await fileAdapter.delete(key);
            return;
        case "handled": {
            const newState: ConversationState = {
                args: convoState.args,
                interrupts: res.interrupts,
                replay: res.replay,
            };
            state.convo[0] = newState;
            await fileAdapter.write(key, versionify(state));
            return;
        }
    }
}

bot.start();

Here are a few interesting things to note about the above code:

swim2sun commented 1 month ago

@KnorpelSenf Thank you very much, your code is very clear, detailed, and helpful!