Closed wscalf closed 1 month ago
Yes, adaptation to stream middleware is rudimentary because the current interface model does not have a good abstraction
Would there be interest in proposals around this? We haven't looked super closely at it, but it seems like there would be a few options:
@wscalf OK
But we need to think about how to do it better
FWIW, I did some poking around at what it would take to have a unified middleware interface for unary and streaming endpoints. It does look possible, and there's some precedent for doing so: go-grpc-middleware defines a Reportable interface and interceptor wrappers that allows for the same implementations to be used with both kinds of endpoints. It's only used for logging, though it seems like something similar could apply generally.
I think it would be a breaking change to the middleware interface to be able to observe and react to multiple incoming or outgoing messages, but existing middleware could still be called from the UnaryServerInterceptor and accessed by the HTTP transport as they are today, alongside the unified middleware, so a piece-by-piece migration should be possible without breaking current functionality or end-user authored middleware.
Maybe something like: type Handler func(ctx context.Context, receiveMsg, sendMsg func(msg any) error, info CallInfo) error
, which would allow implementations to keep more or less the natural flow they have now while being compatible with streaming.
This would allow for implementations like:
func Validator() middleware.Middleware {
return func(handler middleware.Handler) middleware.Handler {
return func(ctx context.Context, receiveMsg, sendMsg func(msg any) error, info CallInfo) error {
recv := func (req any) error {
if v, ok := req.(validator); ok {
if err := v.Validate(); err != nil {
return errors.BadRequest("VALIDATOR", err.Error()).WithCause(err)
}
}
return nil
}
return handler(ctx, recv, func(any) error {return nil}, info)
}
}
}
Or if a middleware wanted the flexibility to treat streaming endpoints differently (ex: if logging middleware didn't want to log streamed inputs), that would still be possible by observing properties on CallInfo (which atm is intended to indicate whether the requests and/or responses are streaming, which would be a passthrough of grpc.StreamServerInfo for streaming gRPC requests and false/false for unary and HTTP requests.)
Thoughts/feedback? I've been mostly exploring so far but could put a draft PR together to provide something more concrete if there's interest, or pivot if this seems totally off the mark.
@go-kratos/contributor @shenqidebaozi
I've been quite busy lately, let me take a look now
What happened:
We've started using Kratos middleware (recovery, logging, validate) in our project, and noticed that, on the gRPC side, they only seem to apply to unary operations. Our streaming operations don't get validated or logged, and a panic takes down the server (versus on a unary endpoint, they're validated and logged as expected, and panics are handled.)
What you expected to happen:
We expected to either have the middleware apply seamlessly to streaming requests or to have separate streaming middleware we can register (which, to be fair, may exist, but we weren't able to find it.)
How to reproduce it (as minimally and precisely as possible):
Repro project: https://github.com/wscalf/streaming-repro
grpcurl -plaintext -d '{"name":""}' localhost:9000 helloworld.v1.Greeter.SayHello
: gets correctly rejected with a validation errorgrpcurl -plaintext -d '{"name":""}' localhost:9000 helloworld.v1.Greeter.KeepSayingHello
: should get rejected (it's using the same message) but does not. Also, is coded to panic when the client closes, which should trigger the recovery middleware, but it doesn't- it just exitsgrpcurl -plaintext -d '{"name":"Kratos"}' localhost:9000 helloworld.v1.Greeter.SayHello
: is valid, and is coded to have a 50/50 chance of panicking, which does correctly trigger the recovery middleware, returning an error to the client and not exiting.Anything else we need to know?:
Environment:
kratos -v
): kratos version v2.7.3go version
): go version go1.22.2 linux/amd64cat /etc/os-release
):