robert-min / handson-go

Go-lang hands on guide
0 stars 0 forks source link

Chapter9. gRPC develop - server_streaming, bidirectional streaming #26

Open robert-min opened 1 year ago

robert-min commented 1 year ago

gRPC 스트리밍 커뮤니케이션

서버 사이드 스트리밍

robert-min commented 1 year ago

서버 사이드 스트리밍 Service

// ./service/users.proto
syntax = "proto3";

option go_package = "github.com/handson-go/chap9/server_streaming/service";

service Users {
    rpc GetUser (UserGetRequest) returns (UserGetReply) {}
}

message UserGetRequest {
    string email = 1;
    string id = 2;
}

message User {
    string id = 1;
    string first_name = 2;
    string last_name = 3;
    int32 age = 4;
}

message UserGetReply {
    User user = 1;
}
// ./service/repositories.proto
syntax = "proto3";

import "users.proto";

option go_package = "github.com/handson-go/chap9/server_streaming/service";

service Repo {
    rpc GetRepos (RepoGetRequest) returns (stream RepoGetReply) {}
}

message RepoGetRequest {
    string create_id = 1;
    string id = 2;
}

message Repository {
    string id = 1;
    string name = 2;
    string url = 3;
    User owner = 4;
}

message RepoGetReply {
    Repository repo = 1;
}
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative users.proto repositories.proto
robert-min commented 1 year ago

서버 사이드 스트리밍 server


type userService struct {
    svc.UnimplementedUsersServer
}

type repoService struct {
    svc.UnimplementedRepoServer
}

func (s *userService) GetUser(ctx context.Context, in *svc.UserGetRequest) (*svc.UserGetReply, error) {
    log.Printf("Received request for user with Email: %s Id: %s\n", in.Email, in.Id)
    components := strings.Split(in.Email, "@")
    if len(components) != 2 {
        return nil, errors.New("invalid email address")
    }

    u := svc.User{
        Id:        in.Id,
        FirstName: components[0],
        LastName:  components[1],
        Age:       36,
    }
    return &svc.UserGetReply{User: &u}, nil
}

// streaming 처리
func (s *repoService) GetRepos(in *svc.RepoGetRequest, stream svc.Repo_GetReposServer) error {
    log.Printf(
        "Received request for repo with CreateId: %s Id: %s\n",
        in.CreateId,
        in.Id,
    )
    repo := svc.Repository{
        Owner: &svc.User{Id: in.CreateId, FirstName: "Kim"},
    }
    cnt := 1
    for {
        repo.Name = fmt.Sprintf("repo-%d", cnt)
        repo.Url = fmt.Sprintf("https://git.example.com/test/%s", repo.Name)
        r := svc.RepoGetReply{
            Repo: &repo,
        }
        if err := stream.Send(&r); err != nil {
            return err
        }
        if cnt >= 5 {
            break
        }
        cnt++
    }
    return nil
}

func registerServices(s *grpc.Server) {
    svc.RegisterUsersServer(s, &userService{})
    svc.RegisterRepoServer(s, &repoService{})
}

func startServer(s *grpc.Server, l net.Listener) error {
    return s.Serve(l)
}

func main() {
    listenAddr := os.Getenv("LISTEN_ADDR")
    if len(listenAddr) == 0 {
        listenAddr = ":50051"
    }
    lis, err := net.Listen("tcp", listenAddr)
    if err != nil {
        log.Fatal(err)
    }

    s := grpc.NewServer()
    registerServices(s)
    log.Fatal(startServer(s, lis))
}
robert-min commented 1 year ago

서버 사이드 스트리밍 Server test


func startTestGRPCServer() *bufconn.Listener {
    l := bufconn.Listen(10)
    s := grpc.NewServer()
    registerServices(s)
    go func() {
        log.Fatal(startServer(s, l))
    }()
    return l
}

func TestUserService(t *testing.T) {
    l := startTestGRPCServer()
    bufconnDialer := func(ctx context.Context, addr string) (net.Conn, error) {
        return l.Dial()
    }

    client, err := grpc.DialContext(
        context.Background(),
        "",
        grpc.WithInsecure(),
        grpc.WithContextDialer(bufconnDialer),
    )
    if err != nil {
        t.Fatal(err)
    }

    usersClient := svc.NewUsersClient(client)
    resp, err := usersClient.GetUser(
        context.Background(),
        &svc.UserGetRequest{Id: "foo-bar", Email: "kim@min.com"},
    )
    if err != nil {
        t.Fatal(err)
    }

    if resp.User.FirstName != "kim" {
        t.Errorf(
            "Expected FirstName to be: jane, Got: %s",
            resp.User.FirstName,
        )
    }
}

func TestRepoService(t *testing.T) {
    l := startTestGRPCServer()
    bufconnDialer := func(ctx context.Context, addr string) (net.Conn, error) {
        return l.Dial()
    }

    client, err := grpc.DialContext(
        context.Background(),
        "",
        grpc.WithInsecure(),
        grpc.WithContextDialer(bufconnDialer),
    )
    if err != nil {
        t.Fatal(err)
    }

    repoClient := svc.NewRepoClient(client)
    stream, err := repoClient.GetRepos(
        context.Background(),
        &svc.RepoGetRequest{CreateId: "user-123", Id: "repo-123"},
    )
    if err != nil {
        t.Fatal(err)
    }

    var repos []*svc.Repository
    for {
        repo, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatal(err)
        }
        repos = append(repos, repo.Repo)
    }
    if len(repos) != 5 {
        t.Fatalf("Expected to get back 5 repos, got back: %d repos", len(repos))
    }

    for idx, repo := range repos {
        gotRepoName := repo.Name
        expetedRepoName := fmt.Sprintf("repo-%d", idx+1)

        if gotRepoName != expetedRepoName {
            t.Errorf(
                "Expected Repo Name to be: %s, Got: %s",
                expetedRepoName,
                gotRepoName,
            )
        }
    }
}
robert-min commented 1 year ago

양방향 스트리밍

robert-min commented 1 year ago

양방향 스트리밍 Service

syntax = "proto3";

option go_package = "github.com/handson-go/chap9/bidi_streaming/service";

service Users {
    rpc GetHelp (stream UserHelpRequest) returns (stream UserHelpReply) {}
}

message User {
    string id = 1;
}

message UserHelpRequest {
    User user = 1;
    string request = 2;
}

message UserHelpReply {
    string response = 1;
}
robert-min commented 1 year ago

양방향 스트리밍 Server


type userService struct {
    svc.UnimplementedUsersServer
}

func (s *userService) GetHelp(stream svc.Users_GetHelpServer) error {
    log.Println("Client conneted")
    for {
        request, err := stream.Recv() // 클라이언트로부터 요청 수신
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }

        fmt.Printf("Request received: %s\n", request.Request)
        response := svc.UserHelpReply{
            Response: request.Request,
        }
        err = stream.Send(&response) // 클라이언트에게 UserHelpReply 메시지 응답을 전송
        if err != nil {
            return err
        }
    }
    log.Println("Client disconnected")
    return nil
}

func registerServices(s *grpc.Server) {
    svc.RegisterUsersServer(s, &userService{})
}

func startServer(s *grpc.Server, l net.Listener) error {
    return s.Serve(l)
}

func main() {
    listenAddr := os.Getenv("LISTEN_ADDR")
    if len(listenAddr) == 0 {
        listenAddr = ":50051"
    }
    lis, err := net.Listen("tcp", listenAddr)
    if err != nil {
        log.Fatal(err)
    }

    s := grpc.NewServer()
    registerServices(s)
    log.Fatal(startServer(s, lis))
}
robert-min commented 1 year ago

양방향 스트리밍 Client


func setupGrpcConn(addr string) (*grpc.ClientConn, error) {
    return grpc.DialContext(
        context.Background(),
        addr,
        grpc.WithInsecure(),
        grpc.WithBlock(),
    )
}

func getUserServiceClient(conn *grpc.ClientConn) svc.UsersClient {
    return svc.NewUsersClient(conn)
}

func setupChat(r io.Reader, w io.Writer, c svc.UsersClient) error {
    stream, err := c.GetHelp(context.Background())
    if err != nil {
        return err
    }

    for {
        scanner := bufio.NewScanner(r)
        prompt := "Request: "
        fmt.Fprintf(w, prompt)

        scanner.Scan()
        if err := scanner.Err(); err != nil {
            return err
        }

        msg := scanner.Text()
        if msg == "quit" {
            break
        }

        request := svc.UserHelpRequest{
            Request: msg,
        }
        err := stream.Send(&request)
        if err != nil {
            return err
        }

        resp, err := stream.Recv()
        if err != nil {
            return err
        }
        fmt.Printf("Response: %s\n", resp.Response)
    }
    return stream.CloseSend()
}

func main() {
    if len(os.Args) != 2 {
        log.Fatal("Must specify a gRPC server address")
    }
    conn, err := setupGrpcConn(os.Args[1])
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    c := getUserServiceClient(conn)
    err = setupChat(os.Stdin, os.Stdout, c)
    if err != nil {
        log.Fatal(err)
    }
}