dapr / java-sdk

Dapr SDK for Java
Apache License 2.0
262 stars 207 forks source link

Bi-direction subscription #1124

Closed artursouza closed 1 month ago

artursouza commented 2 months ago

Description

Bi-directional subscription.

Issue reference

We strive to have all PR being opened based on an issue, where the problem or feature have been discussed prior to implementation.

Please reference the issue this PR will close: https://github.com/dapr/java-sdk/issues/1072

Checklist

Please make sure you've completed the relevant tasks for this PR, out of the following list:

artur-ciocanu commented 1 month ago

@artursouza @salaboy and @cicoyle my Dapr-fu is not that great yet, but for the streaming subscription, after enabling detailed logs I have gotten this stack trace:

time="2024-09-27T17:17:38.233931+03:00" level=info msg="Subscribing to pubsub 'messagebus' topic 'stream-topic'" app_id=pubsubstreamit instance=ciocanu.corp.adobe.com scope=dapr.runtime.pubsub.streamer type=log ver=1.14.2
time="2024-09-27T17:17:38.237124+03:00" level=debug msg="Processing Redis message 1726305169401-0" app_id=pubsubstreamit component="messagebus (pubsub.redis/v1)" instance=ciocanu.corp.adobe.com scope=dapr.contrib type=log ver=1.14.2
time="2024-09-27T17:17:38.237173+03:00" level=debug msg="Processing Redis message 1726305169393-0" app_id=pubsubstreamit component="messagebus (pubsub.redis/v1)" instance=ciocanu.corp.adobe.com scope=dapr.contrib type=log ver=1.14.2
time="2024-09-27T17:17:38.237235+03:00" level=debug msg="Processing Redis message 1726305169396-0" app_id=pubsubstreamit component="messagebus (pubsub.redis/v1)" instance=ciocanu.corp.adobe.com scope=dapr.contrib type=log ver=1.14.2
time="2024-09-27T17:17:38.237322+03:00" level=debug msg="Processing Redis message 1726305169418-0" app_id=pubsubstreamit component="messagebus (pubsub.redis/v1)" instance=ciocanu.corp.adobe.com scope=dapr.contrib type=log ver=1.14.2
time="2024-09-27T17:17:38.237199+03:00" level=debug msg="Processing Redis message 1726305169412-0" app_id=pubsubstreamit component="messagebus (pubsub.redis/v1)" instance=ciocanu.corp.adobe.com scope=dapr.contrib type=log ver=1.14.2
time="2024-09-27T17:17:38.237446+03:00" level=debug msg="Processing Redis message 1726305169399-0" app_id=pubsubstreamit component="messagebus (pubsub.redis/v1)" instance=ciocanu.corp.adobe.com scope=dapr.contrib type=log ver=1.14.2
time="2024-09-27T17:17:38.237241+03:00" level=debug msg="Processing Redis message 1726305169415-0" app_id=pubsubstreamit component="messagebus (pubsub.redis/v1)" instance=ciocanu.corp.adobe.com scope=dapr.contrib type=log ver=1.14.2
time="2024-09-27T17:17:38.237151+03:00" level=debug msg="Processing Redis message 1726305169406-0" app_id=pubsubstreamit component="messagebus (pubsub.redis/v1)" instance=ciocanu.corp.adobe.com scope=dapr.contrib type=log ver=1.14.2
time="2024-09-27T17:17:38.237142+03:00" level=debug msg="Processing Redis message 1726305169409-0" app_id=pubsubstreamit component="messagebus (pubsub.redis/v1)" instance=ciocanu.corp.adobe.com scope=dapr.contrib type=log ver=1.14.2
time="2024-09-27T17:17:38.237216+03:00" level=debug msg="Processing Redis message 1726305169381-0" app_id=pubsubstreamit component="messagebus (pubsub.redis/v1)" instance=ciocanu.corp.adobe.com scope=dapr.contrib type=log ver=1.14.2
17:17:38.236 [grpc-default-executor-1] INFO  foo - Received: 
time="2024-09-27T17:17:38.250127+03:00" level=info msg="Unsubscribed from pubsub 'messagebus' topic 'stream-topic'" app_id=pubsubstreamit instance=ciocanu.corp.adobe.com scope=dapr.runtime.pubsub.streamer type=log ver=1.14.2
17:17:38.249 [grpc-default-executor-1] ERROR foo - Received error:
io.grpc.StatusRuntimeException: CANCELLED: Failed to read message.
    at io.grpc.Status.asRuntimeException(Status.java:533)
    at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481)
    at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:489)
    at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:453)
    at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:486)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:564)
    at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:72)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:729)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:710)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: io.grpc.StatusRuntimeException: INTERNAL: Invalid protobuf byte sequence
    at io.grpc.Status.asRuntimeException(Status.java:525)
    at io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.parse(ProtoLiteUtils.java:240)
    at io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.parse(ProtoLiteUtils.java:134)
    at io.grpc.MethodDescriptor.parseResponse(MethodDescriptor.java:284)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInternal(ClientCallImpl.java:657)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:644)
    ... 5 common frames omitted
Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message had invalid UTF-8.
    at com.google.protobuf.InvalidProtocolBufferException.invalidUtf8(InvalidProtocolBufferException.java:149)
    at com.google.protobuf.Utf8$UnsafeProcessor.decodeUtf8(Utf8.java:1365)
    at com.google.protobuf.Utf8.decodeUtf8(Utf8.java:318)
    at com.google.protobuf.CodedInputStream$ArrayDecoder.readStringRequireUtf8(CodedInputStream.java:788)
    at io.dapr.v1.DaprAppCallbackProtos$TopicEventRequest.<init>(DaprAppCallbackProtos.java:2278)
    at io.dapr.v1.DaprAppCallbackProtos$TopicEventRequest$1.parsePartialFrom(DaprAppCallbackProtos.java:4315)
    at io.dapr.v1.DaprAppCallbackProtos$TopicEventRequest$1.parsePartialFrom(DaprAppCallbackProtos.java:4309)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:63)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:25)
    at io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.parseFrom(ProtoLiteUtils.java:245)
    at io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.parse(ProtoLiteUtils.java:237)
    ... 9 common frames omitted
Stopping dapr application ...

NOTE: Here we have both Dapr sidecar logs and PubSubStreamIT test logs, I am not entirely sure why the ArrayDecoder fails, but I think it is a start and it is clear why the subscription is cancelled immediately.

salaboy commented 1 month ago

Hmm interesting "Protocol message had invalid UTF-8" so it expects utf-8 but it's not getting that?

Also, we have 1.14.4 by now, it might be worth upgrading to that.

artur-ciocanu commented 1 month ago

@artursouza I have finally figured out what is the issue. Currently in Java SDK we use the following proto files: https://raw.githubusercontent.com/dapr/dapr/v1.14.0-rc.2/dapr/proto. While in Dapr 1.14.+ runtime we use these: https://raw.githubusercontent.com/dapr/dapr/v1.14.{patch-version}/dapr/proto. Once I have used the protos for the latest Dapr release, I was able to consume messages in a streaming fashion using the PubSubStreamIT integration tests.

Here are the differences:

For v1.14.0-rc.2 we have:

rpc SubscribeTopicEventsAlpha1(stream SubscribeTopicEventsRequestAlpha1) returns (stream TopicEventRequest) {}

For v1.14.+ we have:

rpc SubscribeTopicEventsAlpha1(stream SubscribeTopicEventsRequestAlpha1) returns (stream SubscribeTopicEventsResponseAlpha1) {}

As we can see the released version has a completely different type for streaming the response. This explains the deserialization exception that I have run into, since Dapr was returning SubscribeTopicEventsResponseAlpha1, but on the Java side we wanted to deserialize it into TopicEventRequest.

Having all of the above, the question is what should be the value that we use in the parent POM for <dapr.proto.baseurl> do we use the latest Dapr release or something else.

CC: @cicoyle @salaboy

artursouza commented 1 month ago

@artursouza I have finally figured out what is the issue. Currently in Java SDK we use the following proto files: https://raw.githubusercontent.com/dapr/dapr/v1.14.0-rc.2/dapr/proto. While in Dapr 1.14.+ runtime we use these: https://raw.githubusercontent.com/dapr/dapr/v1.14.{patch-version}/dapr/proto. Once I have used the protos for the latest Dapr release, I was able to consume messages in a streaming fashion using the PubSubStreamIT integration tests.

Here are the differences:

For v1.14.0-rc.2 we have:

rpc SubscribeTopicEventsAlpha1(stream SubscribeTopicEventsRequestAlpha1) returns (stream TopicEventRequest) {}

For v1.14.+ we have:

rpc SubscribeTopicEventsAlpha1(stream SubscribeTopicEventsRequestAlpha1) returns (stream SubscribeTopicEventsResponseAlpha1) {}

As we can see the released version has a completely different type for streaming the response. This explains the deserialization exception that I have run into, since Dapr was returning SubscribeTopicEventsResponseAlpha1, but on the Java side we wanted to deserialize it into TopicEventRequest.

Having all of the above, the question is what should be the value that we use in the parent POM for <dapr.proto.baseurl> do we use the latest Dapr release or something else.

CC: @cicoyle @salaboy

Correct. We should issue a patch release for the 1.12 Java SDK release. Let me cut a PR now.

artur-ciocanu commented 1 month ago

@artursouza I was reviewing your PR and as far as I can see you adopted a callback approach via SubscriptionListener. I am wondering if using something like a Flux API is a more suitable approach:

<T> Flux<T> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type);

We already use Project Reactor and most of the DaprClient methods return Mono<T>. I think using Flux<T> for asynchronous streams is more idiomatic and aligned with Project Reactor. If necessary client can use subscribeOn on the Flux to process message in a separate thread pool, if necessary.

The implementation that I am proposing should look something like this:

public <T> Flux<T> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type) throws InterruptedException {
    return Flux.create(emitter -> {
      StreamObserver<DaprAppCallbackProtos.TopicEventRequest> responseObserver = new StreamObserver<>() {
        @Override
        public void onNext(...) {
          emitter.next(...);
        }

        @Override
        public void onError(Throwable throwable) {
          emitter.error(throwable);
        }

        @Override
        public void onCompleted() {
          emitter.complete();
        }
      };

      DaprProtos.SubscribeTopicEventsInitialRequestAlpha1 initialRequest =
          DaprProtos.SubscribeTopicEventsInitialRequestAlpha1.newBuilder()
              .setTopic(topic)
              .setPubsubName(pubsubName)
              .build();
      DaprProtos.SubscribeTopicEventsRequestAlpha1 request =
          DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder()
              .setInitialRequest(initialRequest)
              .build();

      var requestObserver = this.asyncStub.subscribeTopicEventsAlpha1(responseObserver);

      requestObserver.onNext(this.request);
    });
  }

Please take a look and let me know your thoughts.

CC: @cicoyle @salaboy

artursouza commented 1 month ago

@artursouza I was reviewing your PR and as far as I can see you adopted a callback approach via SubscriptionListener. I am wondering if using something like a Flux API is a more suitable approach:

<T> Flux<T> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type);

We already use Project Reactor and most of the DaprClient methods return Mono<T>. I think using Flux<T> for asynchronous streams is more idiomatic and aligned with Project Reactor. If necessary client can use subscribeOn on the Flux to process message in a separate thread pool, if necessary.

The implementation that I am proposing should look something like this:

public <T> Flux<T> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type) throws InterruptedException {
    return Flux.create(emitter -> {
      StreamObserver<DaprAppCallbackProtos.TopicEventRequest> responseObserver = new StreamObserver<>() {
        @Override
        public void onNext(...) {
          emitter.next(...);
        }

        @Override
        public void onError(Throwable throwable) {
          emitter.error(throwable);
        }

        @Override
        public void onCompleted() {
          emitter.complete();
        }
      };

      DaprProtos.SubscribeTopicEventsInitialRequestAlpha1 initialRequest =
          DaprProtos.SubscribeTopicEventsInitialRequestAlpha1.newBuilder()
              .setTopic(topic)
              .setPubsubName(pubsubName)
              .build();
      DaprProtos.SubscribeTopicEventsRequestAlpha1 request =
          DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder()
              .setInitialRequest(initialRequest)
              .build();

      var requestObserver = this.asyncStub.subscribeTopicEventsAlpha1(responseObserver);

      requestObserver.onNext(this.request);
    });
  }

Please take a look and let me know your thoughts.

CC: @cicoyle @salaboy

I agree 100%. Although I am not a fan of project Reactor, I agree with the consistency. Let me give this a shot.

artursouza commented 1 month ago

var requestObserver = this.asyncStub.subscribeTopicEventsAlpha1(responseObserver);

  requestObserver.onNext(this.request);

I am trying this. The challenge for this is how to process the ack/retry/drop response from the client code. Any suggestions?

artursouza commented 1 month ago

I made a small change to make the subscription receive the status as a Mono in the listener, so it can be done async on the client side. I am happy to try another API. Remember that this is in the Preview interface, so we can change.