nlkitai / nlux

The 𝗣𝗼𝘄𝗲𝗿𝗳𝘂𝗹 Conversational AI JavaScript Library 💬 — UI for any LLM, supporting LangChain / HuggingFace / Vercel AI, and more 🧡 React, Next.js, and plain JavaScript ⭐️
https://docs.nlkit.com/nlux
Other
1.08k stars 63 forks source link

StreamingAdapterObserver seems to get automatically completed #143

Open tb opened 1 week ago

tb commented 1 week ago

I am connecting AiChat to AI agent via LangServe with:

 const chatAdapter = useAsStreamAdapter(streamText);
 <AiChat adapter={chatAdapter} />

and default streamText code:

import {StreamingAdapterObserver} from '@nlux/react';

export const streamText = async (prompt: string, observer: StreamingAdapterObserver) => {
    const response = await fetch('http://localhost:8000/stream/', {
        method: 'POST',
        body: JSON.stringify({input: {messages: [prompt]}}),
        headers: {'Content-Type': 'application/json'},
    });

    if (response.status !== 200) {
        console.log("--- response.status !== 200");
        observer.error(new Error('Failed to connect to the server'));
        return;
    }

    if (!response.body) {
        console.log("--- !response.body !!!!!!!!!!!!!!!!!!!!!!!!!");
        return;
    }

    const reader = response.body.getReader();
    const textDecoder = new TextDecoder();

    while (true) {
        const { value, done } = await reader.read();
        const {type, messages} = parse(textDecoder.decode(value));
        const content = messages[0] ? messages[0]['content'] : '';

        console.log(JSON.stringify({type: type, content: content}, null, 2));

        if (type == "tools") {
            observer.next(`Running (${type})\n`);
        }

        if (type == "agent" && content) {
            observer.next(content);
        }

        if (done) {
            console.log('--- await reader.read() done')
            break;
        }
    }

    console.log('--- complete')
    observer.complete();
};

This code works well for queries that take up to about 8-10s, but if AI agent uses more tools the code crashes with error: stream.tsx:40 [nlux] Stream is already complete. No more chunks can be added

Here is what I see in the console log for short task that works correctly:

{
  "type": "agent",
  "content": ""
}
{
  "type": "tools",
  "content": "..."
}
{
  "type": "agent",
  "content": ""
}
{
  "type": "tools",
  "content": "..."
}
{
  "type": "agent",
  "content": ""
}
{
  "type": "tools",
  "content": "[(93,)]"
}
{
  "type": "agent",
  "content": "We have a total of 93 clients."
}
{}
{}
--- await reader.read() done
--- complete

and here is how it looks for prompt that uses more tools and more time on the backend:

{
  "type": "agent",
  "content": ""
}
{
  "type": "tools",
  "content": "..."
}
{
  "type": "agent",
  "content": ""
}
{
  "type": "tools",
  "content": "...."
}
{
  "type": "agent",
  "content": ""
}
{
  "type": "tools",
  "content": "..."
}
[nlux] Stream is already complete. No more chunks can be added**
{
  "type": "agent",
  "content": ""
}
{
  "type": "tools",
  "content": "..."
}
**[nlux] Stream is already complete. No more chunks can be added**
{
  "type": "agent",
  "content": "Here are some European clients' names from the database:\n\n1. Alfreds Futterkiste\n2. Berglunds snabbköp\n3. Blauer See Delikatessen\n4. Blondesddsl père et fils\n5. Bólido Comidas preparadas\n6. Bon app'\n7. Chop-suey Chinese\n8. Drachenblut Delikatessen\n9. Du monde entier\n10. Ernst Handel\n11. FISSA Fabrica Inter. Salchichas S.A.\n12. Folies gourmandes\n13. Folk och fä HB\n14. Frankenversand\n15. France restauration\n16. Franchi S.p.A.\n17. Furia Bacalhau e Frutos do Mar\n18. Galería del gastrónomo\n19. Godos Cocina Típica\n20. Hungry Owl All-Night Grocers\n21. Königlich Essen\n22. La corne d'abondance\n23. La maison d'Asie\n24. Lehmanns Marktstand\n25. Magazzini Alimentari Riuniti\n26. Maison Dewey\n27. Morgenstern Gesundkost\n28. Ottilies Käseladen\n29. Paris spécialités\n30. Piccolo und mehr\n31. Princesa Isabel Vinhos\n32. QUICK-Stop\n33. Reggiani Caseifici\n34. Richter Supermarkt\n35. Romero y tomillo\n36. Santé Gourmet\n37. Simons bistro\n38. Spécialités du monde\n39. Suprêmes délices\n40. Toms Spezialitäten\n41. Vaffeljernet\n42. Victuailles en stock\n43. Vins et alcools Chevalier\n44. Die Wandernde Kuh\n45. Wartian Herkku\n46. Wilman Kala\n47. Wolski Zajazd\n\nIf you need more information or specific details, feel free to ask!"
}
{}
{}
--- await reader.read() done
--- complete

It looks like that my streamText code is not completed because of steanText code... but rather somerhere else in codebase which is hard for me to debug.

Is there some kind of timeout in AiChat that could be responsible for this?

tb commented 1 week ago

I have noticed now that if I skip:

        if (type == "tools") {
            observer.next(`Running (${type})\n`);
        }

even short AI requests fail with the error: [nlux] Stream is already complete. No more chunks can be added