permutive-engineering / fs2-pubsub

Google Cloud Pub/Sub stream-based client built on top of cats-effect, fs2 and http4s.
Apache License 2.0
48 stars 16 forks source link

Operations fail with "connection refused" if uri scheme is missing from config #540

Open Daenyth opened 3 months ago

Daenyth commented 3 months ago

I'm reading my uri configuration from the standard gcloud PUBSUB_EMULATOR_HOST environment variable, set like this:

$ gcloud beta emulators pubsub env-init

export PUBSUB_EMULATOR_HOST=localhost:8493

However, setting both publishers and subscribers to use .uri(uri"localhost:8493") causes all operations to fail:

fs2.pubsub.exceptions.PubSubRequestError$$anon$2: Request to PubSub failed.

URI: localhost:8493/v1/projects/test-project/topics/test-topic:publish
Method: POST

Failure: Failed to publish records to PubSub.
    at fs2.pubsub.exceptions.PubSubRequestError$.apply(PubSubRequestError.scala:73)
    at fs2.pubsub.PubSubClient$$anon$2$$anonfun$publish$4.applyOrElse(PubSubClient.scala:213)
    at fs2.pubsub.PubSubClient$$anon$2$$anonfun$publish$4.applyOrElse(PubSubClient.scala:211)
    at scala.PartialFunction$AndThen.applyOrElse(PartialFunction.scala:288)
    at cats.ApplicativeError.$anonfun$recoverWith$1(ApplicativeError.scala:172)
    at modify @ org.http4s.ember.server.internal.Shutdown$$anon$1.<init>(Shutdown.scala:83)
    at flatten$extension @ org.http4s.ember.server.internal.Shutdown$$anon$1.<init>(Shutdown.scala:71)
    at flatModify @ fs2.concurrent.SignallingRef$$anon$4.modify(Signal.scala:302)
    at as @ com.monovore.decline.effect.CommandIOApp$.addVersionFlag(CommandIOApp.scala:67)
    at >>$extension @ org.typelevel.keypool.KeyPool$Builder.keepRunning$1(KeyPool.scala:371)
    at as @ com.monovore.decline.effect.CommandIOApp$.addVersionFlag(CommandIOApp.scala:67)
    at modify @ org.http4s.ember.server.internal.Shutdown$$anon$1.<init>(Shutdown.scala:83)
    at flatten$extension @ org.http4s.ember.server.internal.Shutdown$$anon$1.<init>(Shutdown.scala:71)
    at flatModify @ fs2.concurrent.SignallingRef$$anon$4.modify(Signal.scala:302)
    at as @ com.monovore.decline.effect.CommandIOApp$.addVersionFlag(CommandIOApp.scala:67)
    at >>$extension @ org.typelevel.keypool.KeyPool$Builder.keepRunning$1(KeyPool.scala:371)
    at as @ com.monovore.decline.effect.CommandIOApp$.addVersionFlag(CommandIOApp.scala:67)
    at getAndSet @ org.typelevel.keypool.KeyPool$.destroy(KeyPool.scala:120)
Caused by: java.net.ConnectException: Connection refused
    at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.checkConnect(Native Method)
    at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finishConnect(UnixAsynchronousSocketChannelImpl.java:256)
    at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finish(UnixAsynchronousSocketChannelImpl.java:202)
    at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.onEvent(UnixAsynchronousSocketChannelImpl.java:217)
    at java.base/sun.nio.ch.KQueuePort$EventHandlerTask.run(KQueuePort.java:312)
    at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:113)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
    at delay @ fs2.io.net.SocketGroupCompanionPlatform$AsyncSocketGroup.$anonfun$client$8(SocketGroupPlatform.scala:65)
    at as @ com.monovore.decline.effect.CommandIOApp$.addVersionFlag(CommandIOApp.scala:67)
    at async @ fs2.io.net.SocketGroupCompanionPlatform$AsyncSocketGroup.$anonfun$client$7(SocketGroupPlatform.scala:62)
    at println @ com.monovore.decline.effect.CommandIOApp$.addVersionFlag(CommandIOApp.scala:67)
    at map @ com.comcast.ip4s.SocketAddress.resolve(SocketAddress.scala:33)
    at flatMap @ fs2.io.net.SocketGroupCompanionPlatform$AsyncSocketGroup.connect$1(SocketGroupPlatform.scala:61)
    at delay @ fs2.io.net.SocketGroupCompanionPlatform$AsyncSocketGroup.$anonfun$client$4(SocketGroupPlatform.scala:58)
    at onError$extension @ org.typelevel.keypool.KeyPool$Builder.keepRunning$1(KeyPool.scala:371)
    at delay @ fs2.io.net.SocketGroupCompanionPlatform$AsyncSocketGroup.setup$1(SocketGroupPlatform.scala:55)
    at main$ @ com.monovore.decline.effect.CommandIOApp.main(CommandIOApp.scala:12)
    at main$ @ com.monovore.decline.effect.CommandIOApp.main(CommandIOApp.scala:12)

When I manually set the uri in my configuration, it works again.

case class MyConfig(stuff: Stuff, emulatorHost: Option[Uri])
def readConfig(): MyConfig = ??? // read from PUBSUB_EMULATOR_HOST environment variable

val rawUri = readConfig().emulatorHost
val uri = rawUri.map(uri => uri.copy(scheme = Some(uri.scheme.getOrElse(Uri.Scheme.http))))

Then operations work as normal

I'd like the library to do that scheme detection internally, so that it can more easily work out of the box with the recommend environment variable setup

A bonus would be if fs2-pubsub read the environment variable on its own, since the name is standard