akka / alpakka

Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/libraries/alpakka/current/
Other
1.26k stars 645 forks source link

DynamoSettings and Proxy #1334

Open aesteve opened 6 years ago

aesteve commented 6 years ago

Hi everyone and thank you for your work.

I'm trying to use Scanamo, which in its documentation favors the use of Alpakka-Dynamo for targeting DynamoDB.

My project works fine on many machines, but not the one behind corporate proxies.

Using DynamoDbAsyncClient I was able to specify "client transport" options, hence the proxy settings (relying on environment variables).

But Alpakka's DynamoSettings looks very poor, I must be missing something, because i cannot find the configuration for the underlying http client.

Hope you can give me any pointer, I'm really struggling with this.

Thank you.


EDIT : I don't think there is a way, actually : https://github.com/akka/alpakka/blob/master/dynamodb/src/main/scala/akka/stream/alpakka/dynamodb/impl/DynamoClientImpl.scala#L41

2m commented 6 years ago

Have you tried adjusting the configuration in your application.conf file?

The Akka-Http setting would be:

akka.http.client {
  proxy {
    https {
      host = ""
      port = 443
    }
  }
}
aesteve commented 6 years ago

Yes I tried and unfortunately it did not work :(

I ended-up subclassing DynamoClient with one where the ClientTransport is configurable. Ugly but works.

ennru commented 6 years ago

Would you be able to suggest an appropriate change to fix this?

aesteve commented 5 years ago

Let the ability to configure transport and that should be good for me.

kovacshuni commented 4 years ago

Any news on this? Can you give me a link to the source how you avoided passing host and port and how you used ClientTransport? I can't see the integration point.

kovacshuni commented 4 years ago

@aesteve ^

aesteve commented 4 years ago

Hi, it's a private repository, so I can't share it unfortunately.

But I think I created a package akka.stream.alpakka.dynamodb in my application and added those two files

DynamoClientWithTransport.scala

package akka.stream.alpakka.dynamodb

import akka.actor.ActorSystem
import akka.http.scaladsl.{ClientTransport, Http}
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.Materializer
import akka.stream.alpakka.dynamodb.impl.{DynamoClientImpl, DynamoSettings}
import akka.stream.alpakka.dynamodb.impl.AwsClient.{AwsConnect, AwsRequestMetadata}
import akka.stream.alpakka.dynamodb.scaladsl.DynamoImplicits

class DynamoClientWithTransport(override val settings: DynamoSettings, transport: ClientTransport)
                               (implicit override protected val system: ActorSystem, override implicit protected val materializer: Materializer)
  extends DynamoClientImpl(settings, DynamoImplicits.errorResponseHandler) {

  override protected val connection: AwsConnect = {
    val poolSettings = ConnectionPoolSettings(system)
      .withMaxConnections(settings.parallelism)
      .withMaxOpenRequests(settings.parallelism)
      .withTransport(transport)
    if (settings.port == 443) {
      Http()
        .cachedHostConnectionPoolHttps[AwsRequestMetadata](settings.host, settings = poolSettings)
    } else {
      Http()
        .cachedHostConnectionPool[AwsRequestMetadata](settings.host, settings.port, settings = poolSettings)
    }
  }
}

and

 DynamoClientFix.scala


package akka.stream.alpakka.dynamodb

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.ClientTransport
import akka.stream.ActorMaterializer
import akka.stream.alpakka.dynamodb.impl.{DynamoSettings, Paginator}
import akka.stream.alpakka.dynamodb.scaladsl.DynamoClient
import akka.stream.scaladsl.{Flow, Sink, Source}
import scala.concurrent.Future

/**
  * README
  * Unfortunately, Alpakka's Dynamo driver doesn't provide a way to configure transport (in our case, proxies).
  * We had to fix it in order to provide some custom "transport" config
  * Hopefully, once this issue: https://github.com/akka/alpakka/issues/1334 has been addressed,
  * every piece of code in this package can be removed
  */
object DynamoClientFix {
  def apply(settings: DynamoSettings, transport: ClientTransport)(implicit system: ActorSystem, materializer: ActorMaterializer): DynamoClientFix =
    new DynamoClientFix(settings, transport)
}

final class DynamoClientFix(settings: DynamoSettings, transport: ClientTransport)(implicit system: ActorSystem, materializer: ActorMaterializer) extends DynamoClient {

  private val client = new DynamoClientWithTransport(settings, transport)
  def flow[Op <: AwsOp]: Flow[Op, Op#B, NotUsed] = client.flow[Op]

  def source(op: AwsPagedOp): Source[op.B, NotUsed] =
    Paginator.source(client.flow, op)

  def source(op: AwsOp): Source[op.B, NotUsed] =
    Source.single(op).via(client.flow).map(_.asInstanceOf[op.B])

  def single(op: AwsOp): Future[op.B] =
    Source.single(op).via(client.flow).map(_.asInstanceOf[op.B]).runWith(Sink.head[op.B])

}

It's certainly very ugly and not the way to go, but I didn't find any better implementation at this time.

Unfortunately since it was more than one year ago and I moved on since, I won't be able to provide any other help.

Hopefully it'll be helpful anyway.


Also: I can't remember why I did the trick with port 443, maybe in our case the proxy was only set up for https or something like that.

kovacshuni commented 4 years ago

Thanks for the snippets.