robert-min / handson-go

Go-lang hands on guide
0 stars 0 forks source link

Chapter9. gRPC develop - interception #28

Open robert-min opened 1 year ago

robert-min commented 1 year ago

gRPC middleware 구현

robert-min commented 1 year ago

gRPC middleware 구현 Service

syntax = "proto3";

option go_package = "github.com/handson-go/chap9/interceptors/service";

service Users {
    rpc GetUser (UserGetRequest) returns (UserGetReply) {}
    rpc GetHelp (stream UserHelpRequest) returns (stream UserHelpReply) {}
}

message UserGetRequest {
    string email = 1;
    string id = 2;
}

message User {
    string id = 1;
    string first_name = 2;
    string last_name = 3;
    int32 age = 4;
}

message UserGetReply {
    User user = 1;
}

message UserHelpRequest {
    User user = 1;
    string request = 2;
}

message UserHelpReply {
    string response = 1;
}
robert-min commented 1 year ago

gRPC middleware 구현 Server

package main

import (
    "context"
    "errors"
    "fmt"
    "io"
    "log"
    "net"
    "os"
    "strings"
    "time"

    svc "github.com/handson-go/chap9/interceptors/service"
    "google.golang.org/grpc"
    "google.golang.org/grpc/metadata"
)

type userService struct {
    svc.UnimplementedUsersServer
}

func logMessage(
    ctx context.Context,
    method string,
    latency time.Duration,
    err error,
) {
    var requestId string
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        log.Print("No metadata")
    } else {
        if len(md.Get("Request-Id")) != 0 {
            requestId = md.Get("Request-Id")[0]
        }
    }
    log.Printf("Method:%s, Duration:%s, Error:%v, Request-Id:%s",
        method,
        latency,
        err,
        requestId,
    )
}

func loggingUnaryInterceptor(
    ctx context.Context,
    req interface{},
    info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler,
) (interface{}, error) {
    start := time.Now()
    resp, err := handler(ctx, req)
    logMessage(ctx, info.FullMethod, time.Since(start), err)
    return resp, err
}

func loggingStreamInterceptor(
    srv interface{},
    stream grpc.ServerStream,
    info *grpc.StreamServerInfo,
    handler grpc.StreamHandler,
) error {
    start := time.Now()
    err := handler(srv, stream)
    ctx := stream.Context()
    logMessage(ctx, info.FullMethod, time.Since(start), err)
    return err
}

func (s *userService) GetUser(ctx context.Context, in *svc.UserGetRequest) (*svc.UserGetReply, error) {
    log.Printf(
        "Received request for user with Email: %s Id: %s\n",
        in.Email,
        in.Id,
    )
    components := strings.Split(in.Email, "@")
    if len(components) != 2 {
        return nil, errors.New("invalid email address")
    }

    u := svc.User{
        Id:        in.Id,
        FirstName: components[0],
        LastName:  components[1],
        Age:       36,
    }
    return &svc.UserGetReply{User: &u}, nil
}

func (*userService) GetHelp(stream svc.Users_GetHelpServer) error {
    log.Println("Client connected")
    for {
        request, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }

        fmt.Printf("Request received: %s\n", request.Request)
        response := svc.UserHelpReply{
            Response: request.Request,
        }
        err = stream.Send(&response)
        if err != nil {
            return err
        }
    }
    log.Println("Client disconnected")
    return nil
}

func registerServices(s *grpc.Server) {
    svc.RegisterUsersServer(s, &userService{})
}

func startServer(s *grpc.Server, l net.Listener) error {
    return s.Serve(l)
}

func main() {
    listenAddr := os.Getenv("LISTEN_ADDR")
    if len(listenAddr) == 0 {
        listenAddr = ":50051"
    }

    lis, err := net.Listen("tcp", listenAddr)
    if err != nil {
        log.Fatal(err)
    }
    s := grpc.NewServer(
        grpc.UnaryInterceptor(loggingUnaryInterceptor),   // 단항 RPC 매서드 호출 인터셉트
        grpc.StreamInterceptor(loggingStreamInterceptor), // 스트리밍 RPC 매서드 호출 인터셉트
    )
    registerServices(s)
    log.Fatal(startServer(s, lis))
}
robert-min commented 1 year ago

gRPC middleware 구현 Client


func metadataUnaryInterceptor(
    ctx context.Context,
    method string,
    req, reply interface{},
    cc *grpc.ClientConn,
    invoker grpc.UnaryInvoker,
    opts ...grpc.CallOption,
) error {
    ctxWithMetatdata := metadata.AppendToOutgoingContext(
        ctx,
        "Request-Id",
        "request-123",
    )
    return invoker(ctxWithMetatdata, method, req, reply, cc, opts...)
}

func metadataStreamInterceptor(
    ctx context.Context,
    desc *grpc.StreamDesc,
    cc *grpc.ClientConn,
    method string,
    streamer grpc.Streamer,
    opts ...grpc.CallOption,
) (grpc.ClientStream, error) {
    ctxWithMetadata := metadata.AppendToOutgoingContext(
        ctx,
        "Request-Id",
        "request-123",
    )
    clientStream, err := streamer(
        ctxWithMetadata,
        desc,
        cc,
        method,
        opts...,
    )
    return clientStream, err
}

// Set up Grpc Connection with Server
func setupGrpcConn(addr string) (*grpc.ClientConn, error) {
    return grpc.DialContext(
        context.Background(), // 공백의 Context
        addr,                 // 연결할 서버의 주소
        grpc.WithInsecure(),  // 전송계층보안 사용하지 않고 서버와 통신을 수립하기 위해 사용
        grpc.WithBlock(),     // 함수가 반환되기 전 연결이 먼저 수립외더 연결 객체가 반환되도록 설정
        grpc.WithUnaryInterceptor(metadataUnaryInterceptor),   // 단항 클라이언트 인터셉트
        grpc.WithStreamInterceptor(metadataStreamInterceptor), // 스트림 클라이언트 인터셉트
    )
}

func getUserServiceClient(conn *grpc.ClientConn) svc.UsersClient {
    return svc.NewUsersClient(conn)
}

func getUser(
    client svc.UsersClient,
    u *svc.UserGetRequest,
) (*svc.UserGetReply, error) {
    return client.GetUser(context.Background(), u)
}

func setupChat(r io.Reader, w io.Writer, c svc.UsersClient) error {
    stream, err := c.GetHelp(context.Background())
    if err != nil {
        return err
    }

    for {
        scanner := bufio.NewScanner(r)
        prompt := "Request: "
        fmt.Fprint(w, prompt)

        scanner.Scan()
        if err := scanner.Err(); err != nil {
            return err
        }

        msg := scanner.Text()
        if msg == "quit" {
            break
        }
        request := svc.UserHelpRequest{
            Request: msg,
        }
        err := stream.Send(&request)
        if err != nil {
            return err
        }

        resp, err := stream.Recv()
        if err != nil {
            return err
        }
        fmt.Printf("Response: %s\n", resp.Response)
    }
    return stream.CloseSend()
}

func main() {
    if len(os.Args) != 3 {
        log.Fatal(
            "Must specify a gRPC server address",
        )
    }
    serverAddr := os.Args[1]
    methodName := os.Args[2]

    conn, err := setupGrpcConn(serverAddr)
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    c := getUserServiceClient(conn)

    switch methodName {
    case "GetUser":
        result, err := getUser(
            c,
            &svc.UserGetRequest{Email: "kim@naver.com"},
        )
        if err != nil {
            log.Fatal(err)
        }

        fmt.Fprintf(
            os.Stdout, "User: %s %s \n",
            result.User.FirstName, result.User.LastName,
        )
    case "GetHelp":
        err = setupChat(os.Stdin, os.Stdout, c)
        if err != nil {
            log.Fatal(err)
        }
    default:
        log.Fatal("Unrecognized method name")
    }

}