Skip to content

Operations fail with "connection refused" if uri scheme is missing from config #540

@Daenyth

Description

@Daenyth

I'm reading my uri configuration from the standard gcloud PUBSUB_EMULATOR_HOST environment variable, set like this:

$ gcloud beta emulators pubsub env-init

export PUBSUB_EMULATOR_HOST=localhost:8493

However, setting both publishers and subscribers to use .uri(uri"localhost:8493") causes all operations to fail:

fs2.pubsub.exceptions.PubSubRequestError$$anon$2: Request to PubSub failed.

URI: localhost:8493/v1/projects/test-project/topics/test-topic:publish
Method: POST

Failure: Failed to publish records to PubSub.
	at fs2.pubsub.exceptions.PubSubRequestError$.apply(PubSubRequestError.scala:73)
	at fs2.pubsub.PubSubClient$$anon$2$$anonfun$publish$4.applyOrElse(PubSubClient.scala:213)
	at fs2.pubsub.PubSubClient$$anon$2$$anonfun$publish$4.applyOrElse(PubSubClient.scala:211)
	at scala.PartialFunction$AndThen.applyOrElse(PartialFunction.scala:288)
	at cats.ApplicativeError.$anonfun$recoverWith$1(ApplicativeError.scala:172)
	at modify @ org.http4s.ember.server.internal.Shutdown$$anon$1.<init>(Shutdown.scala:83)
	at flatten$extension @ org.http4s.ember.server.internal.Shutdown$$anon$1.<init>(Shutdown.scala:71)
	at flatModify @ fs2.concurrent.SignallingRef$$anon$4.modify(Signal.scala:302)
	at as @ com.monovore.decline.effect.CommandIOApp$.addVersionFlag(CommandIOApp.scala:67)
	at >>$extension @ org.typelevel.keypool.KeyPool$Builder.keepRunning$1(KeyPool.scala:371)
	at as @ com.monovore.decline.effect.CommandIOApp$.addVersionFlag(CommandIOApp.scala:67)
	at modify @ org.http4s.ember.server.internal.Shutdown$$anon$1.<init>(Shutdown.scala:83)
	at flatten$extension @ org.http4s.ember.server.internal.Shutdown$$anon$1.<init>(Shutdown.scala:71)
	at flatModify @ fs2.concurrent.SignallingRef$$anon$4.modify(Signal.scala:302)
	at as @ com.monovore.decline.effect.CommandIOApp$.addVersionFlag(CommandIOApp.scala:67)
	at >>$extension @ org.typelevel.keypool.KeyPool$Builder.keepRunning$1(KeyPool.scala:371)
	at as @ com.monovore.decline.effect.CommandIOApp$.addVersionFlag(CommandIOApp.scala:67)
	at getAndSet @ org.typelevel.keypool.KeyPool$.destroy(KeyPool.scala:120)
Caused by: java.net.ConnectException: Connection refused
	at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.checkConnect(Native Method)
	at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finishConnect(UnixAsynchronousSocketChannelImpl.java:256)
	at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finish(UnixAsynchronousSocketChannelImpl.java:202)
	at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.onEvent(UnixAsynchronousSocketChannelImpl.java:217)
	at java.base/sun.nio.ch.KQueuePort$EventHandlerTask.run(KQueuePort.java:312)
	at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:113)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
	at delay @ fs2.io.net.SocketGroupCompanionPlatform$AsyncSocketGroup.$anonfun$client$8(SocketGroupPlatform.scala:65)
	at as @ com.monovore.decline.effect.CommandIOApp$.addVersionFlag(CommandIOApp.scala:67)
	at async @ fs2.io.net.SocketGroupCompanionPlatform$AsyncSocketGroup.$anonfun$client$7(SocketGroupPlatform.scala:62)
	at println @ com.monovore.decline.effect.CommandIOApp$.addVersionFlag(CommandIOApp.scala:67)
	at map @ com.comcast.ip4s.SocketAddress.resolve(SocketAddress.scala:33)
	at flatMap @ fs2.io.net.SocketGroupCompanionPlatform$AsyncSocketGroup.connect$1(SocketGroupPlatform.scala:61)
	at delay @ fs2.io.net.SocketGroupCompanionPlatform$AsyncSocketGroup.$anonfun$client$4(SocketGroupPlatform.scala:58)
	at onError$extension @ org.typelevel.keypool.KeyPool$Builder.keepRunning$1(KeyPool.scala:371)
	at delay @ fs2.io.net.SocketGroupCompanionPlatform$AsyncSocketGroup.setup$1(SocketGroupPlatform.scala:55)
	at main$ @ com.monovore.decline.effect.CommandIOApp.main(CommandIOApp.scala:12)
	at main$ @ com.monovore.decline.effect.CommandIOApp.main(CommandIOApp.scala:12)
	```
	
When I manually set the `uri` in my configuration, it works again.

```scala
case class MyConfig(stuff: Stuff, emulatorHost: Option[Uri])
def readConfig(): MyConfig = ??? // read from PUBSUB_EMULATOR_HOST environment variable

val rawUri = readConfig().emulatorHost
val uri = rawUri.map(uri => uri.copy(scheme = Some(uri.scheme.getOrElse(Uri.Scheme.http))))

Then operations work as normal

I'd like the library to do that scheme detection internally, so that it can more easily work out of the box with the recommend environment variable setup

A bonus would be if fs2-pubsub read the environment variable on its own, since the name is standard

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions