LangStream / langstream

LangStream. Event-Driven Developer Platform for Building and Running LLM AI Apps. Powered by Kubernetes and Kafka.
https://langstream.ai
Apache License 2.0
386 stars 28 forks source link

RESOURCE_EXHAUSTED in python-source agent #744

Closed Dobosz closed 9 months ago

Dobosz commented 9 months ago

Processing large data with python-source results in RESOURCE_EXHAUSTED error for gRPC:

Critical failure: gRPC server sent error: RESOURCE_EXHAUSTED: gRPC message exceeds maximum size 4194304: 11646340. Shutting down the runtime...

Perhaps some kind of streaming between python runtime and agent service is in order.

eolivelli commented 9 months ago

How much large is your payload?

eolivelli commented 9 months ago

Also, what's the next agent in your pipeline? Another python agent or a builtin agent?

Dobosz commented 9 months ago

Looks like it's 11646340 (~11MB). Next one is a build-in agent:

  - name: "Extract text"
    id: "extract-text"
    type: "text-extractor"
Dobosz commented 9 months ago

Adding stackstrace:

java.lang.RuntimeException: gRPC server sent error: RESOURCE_EXHAUSTED: gRPC message exceeds maximum size 4194304: 11646340
    at ai.langstream.agents.grpc.GrpcAgentSource$1.onError(GrpcAgentSource.java:136)
    at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:487)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:576)
    at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:757)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:736)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: gRPC message exceeds maximum size 4194304: 11646340
    at io.grpc.Status.asRuntimeException(Status.java:539)
    ... 10 common frames omitted
Dobosz commented 9 months ago

Making it run as a standalone agent by providing input/output unfortunately, doesn't solve the issue.

eolivelli commented 9 months ago

@cbornet is it possible to change the max size of the GRPC payload ?

Dobosz commented 9 months ago

I'm quite sure it's possible to change max inbound size using ManagedChannelBuilder in PythonGrpcServer.java:90. Possible also in insecure_channel in python server. This however won't solve the root problem. Agent may generate payload much bigger than 11 MB.

This is a design choice. Should payload size be bound to some arbitrary max size, or is there some way of splitting the data on transport?

cbornet commented 9 months ago

We can increase the GRPC max message length (up to 2GB it seems). GRPC itself should be fine. We're using a local socket so we don't go through HTTP proxies that could feel bad about it. But with very large documents the contraints will mostly be on the pod memory. Also note that it's important to have a composable agent afterwards to chunk the data before sending it to the streaming engine as neither Kafka nor Pulsar will be happy with a big payload.