permutive-engineering / fs2-pubsub

Google Cloud Pub/Sub stream-based client built on top of cats-effect, fs2 and http4s.
Apache License 2.0
45 stars 16 forks source link

Subscriber using grpc client fails on decode #541

Open martintupy opened 2 weeks ago

martintupy commented 2 weeks ago

When a Subscriber is constructed using grcp client

PubSubSubscriber
  .grpc[IO]
  .projectId(ProjectId("..."))
  .subscription(Subscription("..."))
  .defaultUri
  .httpClient(client)
  .defaultRetry
  .noErrorHandling
  .withDefaults
  .decodeTo[String]
  .subscribe
  .evalMap { message => IO.print(message.value) *> message.ack }
  .compile
  .drain

It's then going to fail on following error. Same constructor with http, is fine.

java.lang.IllegalArgumentException: Illegal base64 character 7b
        at java.base/java.util.Base64$Decoder.decode0(Base64.java:852)
        at java.base/java.util.Base64$Decoder.decode(Base64.java:570)
        at fs2.pubsub.grpc.GrpcConstructors$.fs2$pubsub$grpc$GrpcConstructors$Client$$anon$1$$_$$anonfun$2$$anonfun$2(GrpcConstructors.scala:143)
        at scala.Option.map(Option.scala:242)
        at fs2.pubsub.grpc.GrpcConstructors$Client$$anon$1.$anonfun$2(GrpcConstructors.scala:143)
        at scala.collection.immutable.Vector2.map(Vector.scala:2140)
        at scala.collection.immutable.Vector2.map(Vector.scala:443)
        at fs2.pubsub.grpc.GrpcConstructors$.fs2$pubsub$grpc$GrpcConstructors$Client$$anon$1$$_$read$$anonfun$1(GrpcConstructors.scala:155)
        at modify @ fs2.internal.Scope.close(Scope.scala:262)
        at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
        at uncancelable @ fs2.Compiler$Target.uncancelable(Compiler.scala:165)
        at run$ @ Main$.run(Subscribe.scala:10)
        at >>$extension @ org.typelevel.keypool.KeyPool$Builder.keepRunning$1(KeyPool.scala:370)
        at run$ @ Main$.run(Subscribe.scala:10)
        at update @ fs2.internal.Scope.releaseChildScope(Scope.scala:227)
        at >>$extension @ org.typelevel.keypool.KeyPool$Builder.keepRunning$1(KeyPool.scala:370)
        at run$ @ Main$.run(Subscribe.scala:10)
        at void @ org.typelevel.keypool.KeyPool$.reap(KeyPool.scala:187)
        at main$ @ Main$.main(Subscribe.scala:10)

I suspect that PubSubMessage response in rpc protocol isn't base64 encoded, as it's not stated in rpc api docs https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage. Unlike REST response, which is base64 encoded https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage

Following line probably shouldn't contain base64 decoding https://github.com/permutive-engineering/fs2-pubsub/blob/main/modules/fs2-pubsub/src/main/scala-2.13%2B/fs2/pubsub/grpc/GrpcConstructors.scala#L143

alejandrohdezma commented 1 week ago

Hey @martintupy, great catch! It seems we didn't catch this on testing since both subscriber & publisher were using the base 64 encoding/decoding. This would have failed if data would have been published from another source.