grpc / grpc-dart

The Dart language implementation of gRPC.
https://pub.dev/packages/grpc
Apache License 2.0
860 stars 271 forks source link

Streaming issue on web only #717

Open divan opened 5 months ago

divan commented 5 months ago

Hi, I'm having an issue with streaming only when compiled to the web platform. It's the simple file upload chunking code that works fine on native platforms:

upload.dart

import 'package:<....>/pb/upload.pb.dart' as grpc;

upload(...) {
    final metadata =
        grpcu.UploadRequest_Metadata(size: bytes.length, fileName: fileName);
    var stream = uploadChunks(bytes, metadata: metadata);
    return client.uploadPhoto(stream).then((r) => r.url);
}

Stream<grpc.UploadRequest> uploadChunks(List<int> bytes,
    {required grpc.UploadRequest_Metadata metadata}) async* {
    yield grpc.UploadRequest(metadata: metadata);

    <...chunking logic...>
    for chunk in chunks {
        yield grpc.UploadRequest(chunk: chunk);
    }
  }
}

proto file:

message UploadRequest{
  message Metadata {
      int32 size = 1;
      string file_name = 2; 
      optional string ext = 3;
      optional int32 width = 4;
      optional int32 height = 5;
  }
  oneof data {
    bytes chunk = 1;
    Metadata metadata = 2;
  }
}

message UploadResponse{
  string url = 1;
}

But on web it returns this for every chunk:

DOMException: Failed to execute 'send' on 'XMLHttpRequest': The object's state must be OPENED.
packages/grpc/src/client/transport/xhr_transport.dart 54:36         <fn>
dart-sdk/lib/async/zone.dart 1594:9                                 runUnaryGuarded
dart-sdk/lib/async/stream_impl.dart 365:5                           [_sendData]
dart-sdk/lib/async/stream_impl.dart 297:7                           [_add]
dart-sdk/lib/async/stream_pipe.dart 123:11                          [_add]
dart-sdk/lib/async/stream_pipe.dart 218:9                           [_handleData]
dart-sdk/lib/async/stream_pipe.dart 153:5                           [_handleData]
dart-sdk/lib/async/zone.dart 1594:9                                 runUnaryGuarded
dart-sdk/lib/async/stream_impl.dart 365:5                           [_sendData]
dart-sdk/lib/async/stream_impl.dart 541:13                          perform
dart-sdk/lib/async/stream_impl.dart 646:10                          handleNext
dart-sdk/lib/async/stream_impl.dart 617:7                           callback
dart-sdk/lib/async/schedule_microtask.dart 40:11                    _microtaskLoop
dart-sdk/lib/async/schedule_microtask.dart 49:5                     _startMicrotaskLoop
dart-sdk/lib/_internal/js_dev_runtime/patch/async_patch.dart 181:7  <fn>

Seems like after the initial metadata, the sending connection is closed/cancelled, and subsequent chunks sends are resulting in this error. I can't, however, see any errors neither on server nor on client (using logging with interceptors).

Meanwhile, as I understood, grpc-dart is using grpc-web under the hood, and on the grpc-web README page, there is a paragraph:

Server-side Streaming RPCs (example) (NOTE: Only when grpcwebtext mode is used.) Client-side and Bi-directional streaming is not currently supported (see streaming roadmap).

It leads to this document: https://github.com/grpc/grpc-web/blob/master/doc/streaming-roadmap.md, which is basically saying, "forget about streaming support from the client".

So, I have two questions:

  1. Is it correct that grpc-dart client-side streaming is not working for apps built for web platform?
  2. If so, any solutions/workarounds?
mosuem commented 5 months ago

It's the simple file upload chunking code that works fine on native platforms

There are two different protocols, grpc and grpc-web, which are only related by name.

  1. Is it correct that grpc-dart client-side streaming is not working for apps built for web platform?

Yes, grpc-web does not support client-side streaming at the moment.

  1. If so, any solutions/workarounds?

I am no expert, but I would probably try using multiple unary requests.

OppositeDragon commented 3 days ago

I am facing exactly the same problem. I was not sure if it was my code, as it works fine on native platforms, except on the web. Did you ever find a solution/workaround?

mraleph commented 3 days ago

@OppositeDragon streaming server to client should sorta work, with some caveats (e.g. it buffers the whole stream in memory - so I would not recommend using it for any sort of long running streams - we should really migrate our implementation to fetch to make it more robust). Streaming from client to server does not work.

divan commented 19 hours ago

Did you ever find a solution/workaround?

@OppositeDragon I implemented it as a a fallback with unary connection. For my use case it's okay, as I expect most users to use native apps and use web as a fallback.

import 'package:<my package>/pb/upload.pb.dart' as grpcu;

const chunkSize = 4096;

 Future<String> uploadFile(Uint8List data, String fileName) {
    if (kIsWeb) {
      return uploadFileFuckingWebFallback(data, fileName);
    }

    final metadata =
        grpcu.UploadRequest_Metadata(size: data.length, fileName: fileName);
    var stream = uploadChunks(data, metadata: metadata);
    return client.uploadFile(stream).then((r) => r.url);
 }

  Future<String> uploadFileFuckingWebFallback(
      Uint8List bytes, String fileName) {
    final request = grpcu.UploadSingleRequest(
      data: bytes,
      size: bytes.length,
      fileName: fileName,
    );
    return client
        .uploadFileFuckingWebFallback(request)
        .then((r) => r.url);
  }

Stream<grpcu.UploadRequest> uploadChunks(List<int> bytes,
    {required grpcu.UploadRequest_Metadata metadata}) async* {
  yield grpcu.UploadRequest(metadata: metadata);

  if (bytes.isEmpty) {
    return;
  }

  for (var i = 0; i < bytes.length; i += chunkSize) {
    int potentialEnd = i + chunkSize;

    int end;
    if (potentialEnd < bytes.length) {
      end = potentialEnd;
    } else {
      end = bytes.length;
    }

    final chunk = bytes.sublist(i, end);

    yield grpcu.UploadRequest(chunk: chunk);
  }
}

proto:

syntax = "proto3";

option go_package = "server/pb";

package main;

service MyService {
    rpc UploadFile(stream UploadRequest) returns (UploadResponse) {}
    rpc UploadFileFuckingWebFallback(UploadSingleRequest) returns (UploadResponse) {}
}

message UploadRequest{
  message Metadata {
    int32 size = 1;
    string file_name = 2; 
    optional string ext = 3;
    optional int32 width = 4;
    optional int32 height = 5;
  }
  oneof data {
     bytes chunk = 1;
     Metadata metadata = 2;
  }
}

// used for fucking web fallback unary request
message UploadSingleRequest{
   bytes data = 1;
   int32 size = 2;
   string file_name = 3; 
   optional string ext = 4;
   optional int32 width = 5;
   optional int32 height = 6;
}

message UploadResponse{
  string url = 1;
}

and then on the server side, obviously, two different endpoints:

// UploadFile handles streamed file upload from normal frontend apps.
func (s *MyService) UploadFile(stream pb.MyService_UploadFileServer) error {
    recv := func() (*pb.UploadRequest, error) {
        return stream.Recv()
    }

    url, err := handleUploadStream(s.DB, recv, stream.Context())
    if err != nil {
        return er("upload file", err)
    }

    return stream.SendAndClose(&pb.UploadResponse{
        Url: url,
    })
}

// UploadFileFuckingWebFallback uploads file as a single request (as a fallback for frontends that use typesetting
// engine from 80s and call it modern UI framework or, as it widely known, a fucking web)
func (s *MyService) UploadFileFuckingWebFallback(ctx context.Context, req *pb.UploadSingleRequest) (*pb.UploadResponse, error) {
    url, err := s.DB.SaveFile(ctx, req.Data, req.FileName, req.Size, nil)
    if err != nil {
        return nil, fmt.Errorf("saving file: %w", err)
    }

    return &pb.UploadResponse{
        Url: url,
    }, nil
}