ServiceWeaver / weaver

Programming framework for writing and deploying cloud applications.
https://serviceweaver.dev
Apache License 2.0
4.79k stars 229 forks source link

Stream Listener <-> Storage #505

Open kedric opened 1 year ago

kedric commented 1 year ago

Hello,

I have a question and I don't see how to implement it in framwork.

In many cases I use grpc as a server and I usually stream the results instead of providing a repeated field,

That's why the service often looks like this:

protobuf:

service Books {
    rpc List (BookListRequest) returns (stream Book) {}
}

Repository


type Responder interface {
    Send(*model.Book) error
}

func (r *Repository) List(ctx context.Context, query ..., responder Responder) error {
    rows, err := sql.QueryContext(ctx, ...)
    ...

    for rows.Next() {
        tmp := &model.Book{}
        err := row.Scan(&book.X, &book.Y)
        ...
        responder.Send(book)
    }
}

server:

type BookStreamSender struct {
    Serv pb.Books_BookServer
}

func (ss *BookStreamSender) Send(book *model.Book) error {
    ... mapper
    return ss.Send(...)
}

func (impl *ImplementationServer) List(req *pb.BookListRequest, serv  Books_BookServer) error {
    ...

    return repositoryBook.List(ctx, &BookStreamSender{ Serv: serv } )
}

I'm happy to skip all the logic between the two components, but the idea is there. the principle is quite simple: the server creates a class that responds to the client, which responds to an interface defined in the business part.

i also use the same method in the other direction for mass insertion.

I wonder if with interface{} there wouldn't be the possibility of a kind of pointer that would allow two services to communicate directly.

spetrovic77 commented 1 year ago

Hi @kedric,

It sounds to me like there are two issues at play here. First, you would like the server to be able to stream results back to the client. Currently, Service Weaver doesn't support this. It is (reasonably) feasible for us to add streaming support, as our underlying runtime establishes bi-directional channels between two participants under the hood.

The second issue is the relationship between the Repository and BooksStreamSender. I'm envisioning that those two live in the same process, and that the Repository currently uses the underlying gRPC connection to Send() data on. Is that right? If so, you would likely retain a similar design Service Weaver, possibly passing a channel of type chan *model.Book into Repository.

We will likely need to add streaming support to Service Weaver for this to work. One option is for the Books service to support streaming methods. Here is how you may write it in Service Weaver if we supported this functionality:

type BookListRequest struct {...}
type Book struct {...}

type BooksService interface {
  List(context.Context, BookListRequest) (<-chan *Book, error)
}

Your Service Weaver implementation would possibly look like this:

type BooksServiceImpl struct {
  weaver.Implements[BooksService]
}

func (b *BooksServiceImpl) List(ctx context.Context, req *BookListRequest) (<-chan *Book, error) {
  ch := make(chan *Book)
  sender, err := newBookStreamSender(ctx, req)
  if err != nil {
    return nil, err
  }
  go sender.Run()
  return ch, nil
}

Let me know if something like this will satisfy your requirements, and we can look about adding streaming support (possibly by using Go channels) to Service Weaver.

kedric commented 1 year ago

yes, I think this would work in the case of a data stream.

But it makes me think of another case (sorry, framwork makes me love it):

When I know I'm going to apply a lot of processing to a structure, I'll load all the data from my database linked to that structure so I can modify it easily, then apply all the modifications in a single, fast transaction using bulk update / bulk insert / bulk delete.

example:

type Components struct {}
type Process struct {}
type Recipe struct {}

type RecipeExpend struct {
    Recipe *Recipe
    Ingredient []*Ingredient
    Process []*Process
}

type WriterRepository[T any] interface {
    BulkCreate(context.Context, []T) error
    BulkUpdate(context.Context, []T) error
    BulkDelete(context.Context, []T) error
}

type Writer[T any] struct {
    Repository WriterRepository[T]
    Current []T
    ToCreate []T
    ToUpdate []T
    ToDelete []T
} 

func (w *Writer[T])Save(ctx context.Context) error {
    if err := w.Repository.BulkCreate(ctx, w.ToCreate); err != nil {
        return err
    }
    if err := w.Repository.BulkUpdate(ctx, w.ToUpdate); err != nil {
        return err
    }
    if err := w.Repository.BulkDelete(ctx, w.ToDelete); err != nil {
        return err
    }
}

type RecipeWriter interface {
    AddIngredient(*Ingrediant) 
    RemoveIngredient(id uuid.UUID)

    AddProcess(Process) 
    RemoveProcess(id uuid.UUID)

    Save(ctx context.Context) error
}

// implementation of RecipeWriter 
type RecipeWriterImpl struct {
    Recipe *Recipe
    Ingredient Writer[*Ingredient]
    Process Writer[*Process]
}

/*
...
*/

func (rwi *RecipeWriterImpl) Save(ctx context.Context) error {
    // begin tx
    impl.Ingredient.Save(ctx)
    impl.Process.Save(ctx)
    // commit or rallback
}

type RecipeService interface {
    BuildRecipeWriter(context.Context, id uuid.UUID) (RecipeWriter, error)
}

well, I can see that in reality I'm going to repatriate the data as close as possible to my processing and I'm going to create a helper that will be built at the processing position. But I don't think it's crazy to be able to make RPC calls on a disantante interface.

spetrovic77 commented 1 year ago

Thank you @kedric.

I'm not 100% sure where streaming comes into picture here, but I've noticed that you are transactionally calling impl.Ingredient.Save() and impl.Process.Save().

Service Weaver doesn't provide support for distributed transactions. So you will have to keep Ingredient and Process in the same OS process.

I didn't understand your point fully in sharing this example, maybe you can clarify it for me?

kedric commented 1 year ago

Thank you @spetrovic77

on reflection, my second example isn't viable on a distributed stack.

I think I need to take another look at the repository part while playing with the framework to find what suits me.