semlanik / qtprotobuf

Protobuf generator and bindings for Qt framework
https://semlanik.github.io/qtprotobuf
MIT License
167 stars 38 forks source link

How to use bidirectional flow? #215

Open tangyoha opened 3 years ago

tangyoha commented 3 years ago

Question How to use bidirectional flow? The client calls subscribeUserEventLoopUpdates many times, but the server stream fails to receive the message. proto file:


message UserEvent {
    enum UserEventType {
        AddUserRequest = 0;
        AddUserResponse = 1;
        DelFriendRequest = 3;
        FindUserRequest = 4;
        FindUserResponse = 5;
        ChangeUserRequest = 6;
        ChangeUserResponse = 7;
    }
    UserEventType type = 1;
    string from = 2;
    string eventContent = 3; // json

}

service HttpApi {
    rpc UserEventLoop(stream UserEvent) returns (stream UserEvent) {}
}

cpp code:

void httpapi::HttpApiEngine::addFriend(QString friendUuid)
{
    httpapi::UserEvent req;
    req.setType(httpapi::UserEvent::AddUserRequest);
    QJsonObject jsonObject
    {
        {"friendUuid", friendUuid}
    };
    QJsonDocument content(jsonObject);
    req.setEventContent(content.toJson());
    QEventLoop waiter;

    auto sub = m_client->subscribeUserEventLoopUpdates(req);
    qDebug() << sub.get();//output : QtProtobuf::QGrpcSubscription(0x26bbbd543f0)

    QObject::connect(sub.get(), &QtProtobuf::QGrpcSubscription::updated, this, [sub, this]() {
        auto response = sub->read<httpapi::UserEvent>();
        qDebug() << response.eventContent();
        emit UserEvent(response);
    });
}

go code:

func (s *ltalkChatServer) UserEventLoop(stream pb.HttpApi_UserEventLoopServer) error {
    sessionId, err := auth.ExtractHeader(stream.Context(), "uuid")

    if err != nil {
        fmt.Println("v", err)
        return err
    }

    sess := globals.sessionStore.GetBySessionId(sessionId)
    sess.userEventLoop = stream

    defer func() {
        sess.closeGrpc()
        sess.cleanUp(false)
    }()

    for {
        in, err := stream.Recv()
        // if err == io.EOF {
        //  fmt.Println("finished")
        //  return nil
        // }
        if err != nil {
            fmt.Println("grpc: recv", sess.sid, err)
            time.Sleep(1 * time.Second)
            //return err
        }

        if err != io.EOF {
            handleUserEvent(sess.uid, in)
        }
        sess.lock.Lock()
        if sess.userEventLoop == nil {
            fmt.Println("userEventLoop == nil")
            sess.lock.Unlock()
            break
        }
        sess.lock.Unlock()
    }

    return nil
}

Additional context image

tangyoha commented 3 years ago

After checking the source code, I know the reason. The Equal overloaded in QGrpcSubscription is regarded as the same QGrpcSubscription as long as the method and the parameters are the same.


\\file:qtprotobuf\src\grpc\qgrpcsubscription.h
...
bool operator ==(const QGrpcSubscription &other) const {
        return other.method() == this->method() &&
                other.arg() == this->arg();
    }

...
semlanik commented 3 years ago

Hm, probably arguments are superfluous in this condition. Let me check why it's implemented this way.

semlanik commented 3 years ago

I think this is an issue in the design of the bidirectional subscription. What I understood from the current implementation, that bidirectional subscriptions should also support the argument change without re-instantiation of the subscription. So let's make this a bug instead.

tangyoha commented 3 years ago

Question How to use bidirectional flow? The client calls subscribeUserEventLoopUpdates many times, but the server stream fails to receive the message. proto file:

message UserEvent {
    enum UserEventType {
        AddUserRequest = 0;
        AddUserResponse = 1;
        DelFriendRequest = 3;
        FindUserRequest = 4;
        FindUserResponse = 5;
        ChangeUserRequest = 6;
        ChangeUserResponse = 7;
    }
    UserEventType type = 1;
    string from = 2;
    string eventContent = 3; // json

}

service HttpApi {
    rpc UserEventLoop(stream UserEvent) returns (stream UserEvent) {}
}

cpp code:

void httpapi::HttpApiEngine::addFriend(QString friendUuid)
{
  httpapi::UserEvent req;
  req.setType(httpapi::UserEvent::AddUserRequest);
  QJsonObject jsonObject
  {
      {"friendUuid", friendUuid}
  };
  QJsonDocument content(jsonObject);
  req.setEventContent(content.toJson());
  QEventLoop waiter;

  auto sub = m_client->subscribeUserEventLoopUpdates(req);
  qDebug() << sub.get();//output : QtProtobuf::QGrpcSubscription(0x26bbbd543f0)

  QObject::connect(sub.get(), &QtProtobuf::QGrpcSubscription::updated, this, [sub, this]() {
      auto response = sub->read<httpapi::UserEvent>();
      qDebug() << response.eventContent();
      emit UserEvent(response);
  });
}

go code:

func (s *ltalkChatServer) UserEventLoop(stream pb.HttpApi_UserEventLoopServer) error {
  sessionId, err := auth.ExtractHeader(stream.Context(), "uuid")

  if err != nil {
      fmt.Println("v", err)
      return err
  }

  sess := globals.sessionStore.GetBySessionId(sessionId)
  sess.userEventLoop = stream

  defer func() {
      sess.closeGrpc()
      sess.cleanUp(false)
  }()

  for {
      in, err := stream.Recv()
      // if err == io.EOF {
      //  fmt.Println("finished")
      //  return nil
      // }
      if err != nil {
          fmt.Println("grpc: recv", sess.sid, err)
          time.Sleep(1 * time.Second)
          //return err
      }

      if err != io.EOF {
          handleUserEvent(sess.uid, in)
      }
      sess.lock.Lock()
      if sess.userEventLoop == nil {
          fmt.Println("userEventLoop == nil")
          sess.lock.Unlock()
          break
      }
      sess.lock.Unlock()
  }

  return nil
}

Additional context image

When I called subscribeUserEventLoopUpdates for the second time, the Golang-based server reentered the UserEventLoop method. Why didn’t the server think it was the same steam?

tangyoha commented 3 years ago

Is it because of the Post parameter problem that the Golang server cannot recognize the two posts as the same steam?

semlanik commented 3 years ago

Is it because of the Post parameter problem that the Golang server cannot recognize the two posts as the same steam?

It's an issue in the bidirectional stream design. I didn't take into account that changing the argument is possible without recreating a stream.

So how it currently works:

This scheme works fine with the unidirectional server stream. But for the bidirectional stream it should work another way:

lubagov commented 3 years ago

Oops ... really got into trouble with this now.... :-( in many things i plan to use...

As far as I understand, bi-directional stream does not accept any data immediately. It can be in the following scheme:

client reads <- server writes
the client analyzes the received data
client writes -> server reads

two things:

  1. If I have no data for the argument immediately...;
  2. And outgoing streaming gRPC call could immediately accept (list) values ​​to send in one TCP / IP packet.
service MyService {
  rpc bidirectional (stream SomeMesssage) returns (stream SomeMesssage2) {}
}

service MyService {
  rpc outgoing (stream SomeMesssage) returns (google.protobuf.Empty) {}
}

In general, I expected in the generated client object code of a bidirectional stream:

  1. Subscribe method without arguments to subscribe to incoming stream;
  2. Method for get access to outgoing stream (return like stream object with write method), into which I can write either a single record or a list (set of objects).. (objects if there are many of them and we go in the same stream to send them immediately, in one TCP/IP package. for example i want to write 500 same objects to outgoing stream?).
semlanik commented 3 years ago

@lubagov yeah, and the issue requires changes in Qt, as per my investigation. I'm checking if it's possible to extend existing networking API and support outgoing stream, but Qt Network was designed for great old HTTP1.1 where paradigm one request - one reply worked perfectly. What you could try is the Native gRPC channel that was introduced some time ago by @rapgenic and uses reference gRPC implementation as the backend. I didn't play much with it, but perhaps it requires some changes in the QtGrpc API as well.

lubagov commented 3 years ago

you men option QT_PROTOBUF_NATIVE_GRPC_CHANNEL ? or what exactly? In principle, it is obvious that we will need to make changes to the project code. At a minimum, the current API only has a "read" method and not a "write" method.

lubagov commented 3 years ago

I while not recompile sources... And not try use Native Channel but If i right understand https://github.com/semlanik/qtprotobuf/blob/c6d7977a544f004f26215f01d207aae599fb6139/src/grpc/qgrpcchannel.cpp#L90 Hard to understand code for me, but seems, it create separate thread for each subscription and using blocking call... And i don't like it :-)

semlanik commented 3 years ago

Yeah, perhaps it should be improved. Anyway, I'm digging to find a solution using Qt Network.

lubagov commented 3 years ago

I try to play.... Sync method normal working if i use grpc::ByteBuffer as argument/response;

But async method, not:

::grpc::ClientAsyncReader<grpc::ByteBuffer>* reader=grpc::internal::ClientAsyncReaderFactory<grpc::ByteBuffer>::Create(ch.get(),&queue,method,&clientContext, request, true, tag);

Read not fill buffer at all. Like this:

::grpc::ClientAsyncReader<google::protobuf::Message>* reader=grpc::internal::ClientAsyncReaderFactory<google::protobuf::Message>::Create(
                ch.get(),&queue,method,&clientContext, request, true, tag);

working, but google::protobuf::Message abstract class, and should provide GetMetadata, New, GetCachedSize. Metadata dependent of context and concrete object:

const char descriptor_table_protodef_proto_2fAuthorization_2eproto[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) =
  "\n\031proto/Authorization.proto\"\'\n\017RegisterR"
  "equest\022\024\n\014magic_string\030\001 \001(\t\"m\n\020Register"
  "Response\022\025\n\rrefresh_token\030\001 \001(\t\022\024\n\014acces"
  "s_token\030\002 \001(\t\022\024\n\014magic_string\030\003 \001(\t\022\026\n\016e"
  "xpirationTime\030\004 \001(\003\"%\n\014RenewRequest\022\025\n\rr"
  "efresh_token\030\001 \001(\t\"\354\001\n\rRenewResponse\022\025\n\r"
  "refresh_token\030\001 \001(\t\022\024\n\014access_token\030\002 \001("
  "\t\022*\n\006status\030\003 \001(\0162\032.RenewResponse.TokenS"
  "tatus\022\025\n\rerror_message\030\004 \001(\t\"k\n\013TokenSta"
  "tus\022\006\n\002OK\020\000\022\023\n\017REFRESH_EXPIRED\020\001\022\025\n\021REFR"
  "ESH_NOT_EXIST\020\002\022\027\n\023REFRESH_MALL_FORMED\020\003"
  "\022\017\n\013OTHER_ERROR\020\0042y\n\rAuthorization\0229\n\016Re"
  "gisterDevice\022\020.RegisterRequest\032\021.Registe"
  "rResponse\"\0000\001\022-\n\nRenewToken\022\r.RenewReque"
  "st\032\016.RenewResponse\"\000B\026\n\024com.lurity.grpc_"
  "authb\006proto3"
  ;
static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_proto_2fAuthorization_2eproto_deps[1] = {
};
static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_proto_2fAuthorization_2eproto_sccs[4] = {
  &scc_info_RegisterRequest_proto_2fAuthorization_2eproto.base,
  &scc_info_RegisterResponse_proto_2fAuthorization_2eproto.base,
  &scc_info_RenewRequest_proto_2fAuthorization_2eproto.base,
  &scc_info_RenewResponse_proto_2fAuthorization_2eproto.base,
};
static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_proto_2fAuthorization_2eproto_once;
const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_proto_2fAuthorization_2eproto = {
  false, false, descriptor_table_protodef_proto_2fAuthorization_2eproto, "proto/Authorization.proto", 612,
  &descriptor_table_proto_2fAuthorization_2eproto_once, descriptor_table_proto_2fAuthorization_2eproto_sccs, descriptor_table_proto_2fAuthorization_2eproto_deps, 4, 0,
  schemas, file_default_instances, TableStruct_proto_2fAuthorization_2eproto::offsets,
  file_level_metadata_proto_2fAuthorization_2eproto, 4, file_level_enum_descriptors_proto_2fAuthorization_2eproto, file_level_service_descriptors_proto_2fAuthorization_2eproto,
};

accordingly, at this point there is simply nowhere to fill it....

lubagov commented 3 years ago

My mistake, normal work with grpc::ByteBuffer async method. I just didn't take into account the fact that the buffer is filled not in the Read method, but in the Next method in the next step.

semlanik commented 3 years ago

Ok, I checked the whole path of QNetworkRequest/QNetworkReply and сфт only state that it's impossible to implement client-side streaming without changes in QtNetwork. I will check if I could hack networking using some private modules. But for now, it looks impracticable.

lubagov commented 3 years ago

If QtNetwork is not suitable for this, we can use both the native implementation and other HTTP2 libraries. Hacking with private classes I would be afraid to use because of possible incompatibilities, if it is will not supported on Qt Project side.

But if I use the native implementation, then it will not be in the same form as it needs to be improved now. For now, I want to try, it how it work as is, next make the channel this on asynchronous calls. If will ok, refine further with bi-directional stream.

semlanik commented 3 years ago

Agree, I will prepare the patch with necessary API changes anyway, since currently, it lucks the way to send a new "argument" to the server, but leave it unimplemented.

lubagov commented 3 years ago

https://github.com/lubagov/qtprotobuf/commit/b4ade2efb6814c004466fa05c4f3dccc8017a5a3 something terrible seems to have done. But the methods are asynchronous now. Although I very much doubt the correctness and thread safety. (Really think batter call method in thread loop over queue. QMetaObject::invokeMethod, and I think that Finish may not be called in some cases. Or called if the object has already been destroyed, from thread loop, if destroyed signal called.)

semlanik commented 3 years ago

@lubagov at first look the patch looks good for me. Meanwhile could you please align naming and spaces according to common project coding style and perhaps submit a pull request if you feel it's ready :) Thanks for your contribution!

lubagov commented 3 years ago

I am currently trying to add bidirection subscriptions. Just a question why there is no emit in front of many signals in the code?

lubagov commented 3 years ago

https://github.com/lubagov/qtprotobuf/commit/bb780d4a9b7c678eb28471bc1c70be9dd77d0680 I don't know how good this is, but I tried adding bidirectional calls. Look like Copy-Paste existing code in many things..

semlanik commented 3 years ago

Pretty close to the design I would like to invent. Sorry that didn't provide any highlevel API yet, out of power these days.