LangStream / langstream

LangStream. Event-Driven Developer Platform for Building and Running LLM AI Apps. Powered by Kubernetes and Kafka.
https://langstream.ai
Apache License 2.0
379 stars 28 forks source link

java.lang.RuntimeException: Missing some input keys: {'query'} for source record KafkaRecord.KafkaConsumerRecord #710

Closed devinbost closed 7 months ago

devinbost commented 7 months ago

When I try to test my Python processor by entering a question in the LangStream UI, I get this exception.

10:36:14.447 [grpc-default-executor-1] INFO  a.l.r.agent.StandardErrorsHandler -- Handling error java.lang.RuntimeException: Missing some input keys: {'query'} for source record KafkaRecord.KafkaConsumerRecord(record=ConsumerRecord(topic = questions-topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1699526174128, serialized key size = -1, serialized value size = 38, headers = RecordHeaders(headers = [RecordHeader(key = langstream-client-session-id, value = [116, 56, 110, 73, 117, 97, 88, 52, 49, 74, 106, 97, 66, 106, 52, 54])], isReadOnly = false), key = null, value = What's a key part of the specified strategy?)), errors count 1 (max retries 0)
10:36:14.452 [grpc-default-executor-1] ERROR a.l.runtime.agent.AgentRunner -- Unrecoverable error while processing the records, skipping
java.lang.RuntimeException: Missing some input keys: {'query'}
        at ai.langstream.agents.grpc.GrpcAgentProcessor.fromGrpc(GrpcAgentProcessor.java:125)
        at ai.langstream.agents.grpc.GrpcAgentProcessor$1.lambda$onNext$0(GrpcAgentProcessor.java:158)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
        at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1092)
        at ai.langstream.agents.grpc.GrpcAgentProcessor$1.onNext(GrpcAgentProcessor.java:144)
        at ai.langstream.agents.grpc.GrpcAgentProcessor$1.onNext(GrpcAgentProcessor.java:134)
        at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:474)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInternal(ClientCallImpl.java:675)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:660)
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)
10:36:15.437 [grpc-default-executor-1] INFO  a.l.k.runner.KafkaConsumerWrapper -- Current position on partition questions-topic-0 is nullquery

I looked at how the message is being sent by the UI, and index.html sends the message like this: handler.producer.send(JSON.stringify({value: message}))

It's not clear to me what the schema is of the message object, but it doesn't appear to have a query property. In my pipeline for the chatbot, I tried doing something like this:

pipeline:
  - name: "Transform Query"
    type: "compute"
    input: "questions-topic"
    configuration:
      fields:
        - name: "value.query"
          expression: "value.value"
          type: STRING

but it's not even getting that far before giving the same gRPC exception.

If I remove that snippet, here is the pipeline:

topics:
  - name: "questions-topic"
    creation-mode: create-if-not-exists
  - name: "answers-topic"
    creation-mode: create-if-not-exists
  - name: "log-topic"
    creation-mode: create-if-not-exists
pipeline:
  - name: "Implemented a chat-bot using OpenAI and LangChain"
    id: "chatbot"
    type: "python-processor"
    output: "answers-topic"
    errors:
      on-failure: "skip"
    configuration:
      className: "langchain_chat.LangChainChat"
      astra-db-keyspace: "{{ secrets.astra-langchain.keyspace }}"
      astra-db-table: "{{ secrets.astra-langchain.table }}"
      resources: # This mapping will be replaced by the configuration.yaml in the next release.
        openai:
          access-key: "{{secrets.open-ai.access-key}}"
          provider: "azure"
          deployment: "gpt-35-turbo-16k"
        database:
          token: "{{secrets.astra-langchain.token}}"
          database-id: "{{secrets.astra-langchain.database-id}}"
      environment:
        - key: "OPENAI_API_TYPE" # Required to use OpenAIEmbeddings with Azure endpoint
          value: "azure"
        - key: "OPENAI_API_VERSION" # Required to use OpenAIEmbeddings with Azure endpoint
          value: "2023-05-15"
        - key: "OPENAI_API_KEY" # Required to use OpenAIEmbeddings with Azure endpoint
          value: "{{secrets.open-ai.access-key}}"
        - key: "OPENAI_API_BASE" # Required to use OpenAIEmbeddings with Azure endpoint
          value: "{{secrets.open-ai.url}}"
        - key: "AZURE_OPENAI_API_ENDPOINT" # Required to use AzureChatOpenAI
          value: "{{secrets.open-ai.url}}"
        - key: "AZURE_OPENAI_API_KEY" # Required to use AzureChatOpenAI
          value: "{{secrets.open-ai.access-key}}"

Here is the Python processor:

import os
from langstream import Processor, SimpleRecord
from operator import itemgetter
from typing import Dict, List, Optional, Sequence, Any
from langchain.vectorstores.cassandra import Cassandra
from langchain.chat_models import ChatOpenAI, AzureChatOpenAI
from langchain.embeddings import OpenAIEmbeddings
from langchain.chains import RetrievalQA
import cassio
import logging

class LangChainChat(Processor):

    def init(self, config):
        logging.info("Initializing LangChain Chat with config %s", config)
        # the values are configured in the resources section in configuration.yaml
        self.openai_key = config["resources"]["openai"]["access-key"]
        self.astra_db_token = config["resources"]["database"]["token"]
        self.astra_db_id = config["resources"]["database"]["database-id"]
        self.astra_keyspace = config["astra-db-keyspace"]
        self.astra_table_name = config["astra-db-table"]
        self.deployment = config["resources"]["openai"]["deployment"]

        cassio.init(token=self.astra_db_token, database_id=self.astra_db_id)

        self.embedings_model = OpenAIEmbeddings(openai_api_key=self.openai_key)

        self.astra_vector_store = Cassandra(
            embedding=self.embedings_model,
            session=None,
            keyspace=self.astra_keyspace,
            table_name=self.astra_table_name,
        )

        self.llm = AzureChatOpenAI(
            deployment_name=self.deployment,
            streaming=True,
            temperature=0,
        )
        # ^^ key obtained from env var: AZURE_OPENAI_API_KEY
        # version obtained from env var: OPENAI_API_VERSION

        self.retriever = self.astra_vector_store.as_retriever()

        self.answer_chain = RetrievalQA.from_chain_type(
            llm=self.llm, chain_type="stuff", retriever=self.retriever
        )

    def process(self, record):
        question = record.value()
        response = self.answer_chain.invoke({
            "question": question,
            "chat_history": []
        })
        return [SimpleRecord(response, headers=record.headers())]

Here is the gateway:

gateways:
  - id: "user-input"
    type: produce
    topic: "questions-topic"
    parameters:
      - sessionId
    produceOptions:
      headers:
        - key: langstream-client-session-id
          valueFromParameters: sessionId

  - id: "bot-output"
    type: consume
    topic: "answers-topic"
    parameters:
      - sessionId
    consumeOptions:
      filters:
        headers:
          - key: langstream-client-session-id
            valueFromParameters: sessionId
eolivelli commented 7 months ago

do you have more logs ? it seems an error in the python runtime.

devinbost commented 7 months ago

After upgrading to Langstream 0.4.3, I get slightly different behavior. Now, it just seems to do nothing when I send a message through the UI. Here are the logs:

langstream_rag_logs.log

The code to reproduce it is here: https://github.com/devinbost/langstream/tree/azure-chatbot/examples/applications/azure-document-ingestion (The readme is outdated.)

cbornet commented 7 months ago

Missing some input keys is a langchain error. The logs could be better to explain where the error comes from. We lose some error context during the translation from Python to Java (eg. the python stack trace). Do you have unit tests ? It would help you to debug locally.

cbornet commented 7 months ago

I think this comes from the prompt template. Since you didn't specify one, it's probably picking a default one but I don't know which one. You don't put a prompt template in the chain so the first step is the retriever that takes a query parameter as input.

cbornet commented 7 months ago

See https://python.langchain.com/docs/use_cases/question_answering/vector_db_qa The param is indeed named query for the retriever

devinbost commented 7 months ago

@cbornet You're absolutely right that was a LangChain issue. I remember now that I ran into this issue with LangChain once before when I tried adding conversational memory to a Q&A chain. It didn't cross my mind because the stack trace threw me off. I applied a change that fixed the issue in a previous project. However, due to the issue below, I can't determine if the change fixed the issue or not.

After I upgraded to Langstream 0.4.3, I'm not getting any logs when I send a message through the UI. What should I check next?

devinbost commented 7 months ago

Here's a thread dump:

langstream_jstack.log

devinbost commented 7 months ago

I found the issue. This was a clue:

17:39:16.394 [ws-consume-1] WARN o.a.k.c.c.i.ConsumerCoordinator -- [Consumer clientId=consumer--1, groupId=] Offset commit failed on partition answers-topic-0 at offset 0: This is not the correct coordinator.

I checked the pipeline.yaml input and output topics, and somehow I had inadvertently deleted the input line from the pipeline config. After setting it back to "questions-topic", it started working as desired.

For anyone else reading this, the fix was to replace:

 response = self.answer_chain.invoke({
             "question": question,
             "chat_history": []
         })

with:

response = self.answer_chain.run(question)

To implement conversational history, a conversational chain should be used instead, like ConversationalRetrievalChain.