bluelabsio / s3-stream

Akka Streaming Client for S3 and Supporting Libraries
Other
16 stars 6 forks source link

Question :) #3

Closed Ralph-Zitz closed 8 years ago

Ralph-Zitz commented 8 years ago

Is this working yet, or still under development? I seem to have problems getting uploading of files to work.

Cheers.

joearasin commented 8 years ago

We're running it, and it's working for us, but there are enough moving parts that there's a very real chance you ran into something that we haven't yet -- any more detail you can give would be awesome.

Ralph-Zitz commented 8 years ago

I have something along the lines of the following:


val file = new File("somefile")
 val source: Source[ByteString, Future[IOResult]] = FileIO.fromFile(file)
 val sink: Sink[ByteString, Future[CompleteMultipartUploadResult]] = stream.multipartUpload(S3Location("testbucket", "test"))
  val result: Future[CompleteMultipartUploadResult] = source.runWith(sink)

  result onSuccess {
    case res => Try(res)
  }

  result onFailure {
    case ex: Exception => ex.printStackTrace()
  }

  result onComplete {
    case res => Try(res)
  }

Nothing seems to happen though - any hints?

joearasin commented 8 years ago

By "nothing happens" -- what do you get if you print out the value of the materialized future?

joearasin commented 8 years ago

The other thing that can help is turning on akka-logging. I'd like to add some more logging to the innards of this code, but for now, it should at least log the HTTP responses when the logging level is debug.

Ralph-Zitz commented 8 years ago

you mean the value of 'val result'?

joearasin commented 8 years ago

Your onSuccess and onComplete callbacks look like they do nothing.

try doing something like:

result onComplete {
  case res => println(res)
}

which might let us see what's up.

Ralph-Zitz commented 8 years ago

I never reach any of the states. Essentially this service would would act like an HTTP endpoint, accepting a post request (implemented with Akka-Http) so right now it is still just listening on localhost i.e.

import java.io.{File, FileInputStream, InputStream}
import java.security.{KeyStore, SecureRandom}
import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}

import akka.actor.ActorSystem
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.{ConnectionContext, Http}
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.model.HttpEntity
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.stream.javadsl.StreamConverters
import akka.stream.{ActorMaterializer, IOResult}
import akka.stream.scaladsl.{FileIO, Flow, Sink, Source}
import akka.util.ByteString
import com.bluelabs.akkaaws.AWSCredentials
import com.bluelabs.s3stream.{CompleteMultipartUploadResult, S3Location, S3Stream}
import com.typesafe.config.ConfigFactory

import scala.concurrent.Future
import scala.util.Try

object ApplicationMain extends App {
  implicit val system = ActorSystem("MyActorSystem")
  implicit val mat: ActorMaterializer = ActorMaterializer()
  implicit val ec = mat.executionContext

  private val config = ConfigFactory.load()
  private val sslContext: SSLContext = {
    val keyStoreResource = new FileInputStream(config.getString("ssl.jksKeystore"))
    val password = config.getString("ssl.password")
    val keyStore = KeyStore.getInstance("jks")
    keyStore.load(keyStoreResource, password.toCharArray)

    val keyManagerFactory = KeyManagerFactory.getInstance("PKIX")
    keyManagerFactory.init(keyStore, password.toCharArray)

    val trustManagerFactory = TrustManagerFactory.getInstance("PKIX")
    trustManagerFactory.init(keyStore)

    val context = SSLContext.getInstance("TLS")
    context.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, new SecureRandom)
    context
  }
  private val key = config.getString("aws.key")
  private val secret = config.getString("aws.secret")
  private val creds = AWSCredentials(key, secret)
  private val stream: S3Stream = new S3Stream(creds)

  private val httpsContext: ConnectionContext =
    ConnectionContext.https(sslContext = sslContext,
      enabledCipherSuites = Some(List("TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256")),
      enabledProtocols = Some(List("TLSv1.2")),
      clientAuth = None,
      sslParameters = None
    )
 val file = new File("some file")
  val source: Source[ByteString, Future[IOResult]] = FileIO.fromFile(file)
  val sink: Sink[ByteString, Future[CompleteMultipartUploadResult]] = stream.multipartUpload(S3Location("blah", "temp"))
  val result: Future[CompleteMultipartUploadResult] = source.runWith(sink)

  result onSuccess {
    case res => println(res)
  }

  result onFailure {
    case ex: Exception => ex.printStackTrace()
  }

  result onComplete {
    case res => println(res)
  }

  val route: Route = {
    path ("upload") {
      post {
        complete { "to come"
        }
      }
    } ~
    path ("ping") {
      get {
        complete("PONG!")
      }
    } ~
    complete {
      HttpResponse(status = StatusCodes.NotFound, entity = "Not Found!")
    }
  }
  val serverSource = Http().bindAndHandle(handler = route,
                                          interface = "localhost",
                                          port = 8080,
                                          connectionContext = httpsContext)
  serverSource onSuccess {
    case binding:ServerBinding => println(s"Bound to $binding")
  }
  serverSource onFailure {
    case ex: Exception => println("Failed to bind to $interface:$port")
      serverSource.flatMap(_.unbind()).onComplete(_ => system.terminate())
  }
  sys.addShutdownHook(system.terminate())
}

I would expect it to upload the local file while the waiting for incoming connections. This is the log output:

[DEBUG] [04/18/2016 20:07:05.118] [run-main-0] [EventStream(akka://MyActorSystem)] logger log1-Logging$DefaultLogger started
[DEBUG] [04/18/2016 20:07:05.118] [run-main-0] [EventStream(akka://MyActorSystem)] Default Loggers started
[DEBUG] [04/18/2016 20:07:05.250] [run-main-0] [AkkaSSLConfig(akka://MyActorSystem)] Initializing AkkaSSLConfig extension...
[DEBUG] [04/18/2016 20:07:05.269] [run-main-0] [AkkaSSLConfig(akka://MyActorSystem)] buildHostnameVerifier: created hostname verifier: com.typesafe.sslconfig.ssl.DefaultHostnameVerifier@11aff1f3
[DEBUG] [04/18/2016 20:07:05.774] [MyActorSystem-akka.actor.default-dispatcher-12] [akka://MyActorSystem/system/IO-TCP/selectors/$a/0] Successfully bound to /127.0.0.1:8080
Bound to ServerBinding(/127.0.0.1:8080)
[DEBUG] [04/18/2016 20:07:05.785] [MyActorSystem-akka.actor.default-dispatcher-7] [akka://MyActorSystem/user/PoolInterfaceActor-0] (Re-)starting host connection pool to blah.s3.amazonaws.com:443
[DEBUG] [04/18/2016 20:07:06.328] [MyActorSystem-akka.actor.default-dispatcher-6] [akka://MyActorSystem/user/SlotProcessor-1] become unconnected, from subscriber pending
[DEBUG] [04/18/2016 20:07:06.328] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/SlotProcessor-2] become unconnected, from subscriber pending
[DEBUG] [04/18/2016 20:07:06.328] [MyActorSystem-akka.actor.default-dispatcher-12] [akka://MyActorSystem/user/SlotProcessor-3] become unconnected, from subscriber pending
[DEBUG] [04/18/2016 20:07:06.328] [MyActorSystem-akka.actor.default-dispatcher-9] [akka://MyActorSystem/user/SlotProcessor-0] become unconnected, from subscriber pending
[DEBUG] [04/18/2016 20:07:06.361] [MyActorSystem-akka.actor.default-dispatcher-7] [akka://MyActorSystem/system/IO-TCP/selectors/$a/1] Attempting connection to [blah.s3.amazonaws.com/54.231.131.122:443]
[DEBUG] [04/18/2016 20:07:06.414] [MyActorSystem-akka.actor.default-dispatcher-9] [akka://MyActorSystem/system/IO-TCP/selectors/$a/1] Connection established to [blah.s3.amazonaws.com/54.231.131.122:443]
[DEBUG] [04/18/2016 20:07:06.542] [MyActorSystem-akka.actor.default-dispatcher-7] [com.typesafe.sslconfig.ssl.DefaultHostnameVerifier] verify: hostname = blah.s3.amazonaws.com, sessionId (base64) = 9CNqXfEOGGWTZPrMbT385PwibLN9XhP55illxZbHa+g=
[DEBUG] [04/18/2016 20:07:06.542] [MyActorSystem-akka.actor.default-dispatcher-7] [com.typesafe.sslconfig.ssl.DefaultHostnameVerifier] verify: returning true
[DEBUG] [04/18/2016 20:07:06.615] [MyActorSystem-akka.actor.default-dispatcher-7] [akka://MyActorSystem/user/SlotProcessor-0] Slot 0 disconnected after regular connection close
[DEBUG] [04/18/2016 20:07:36.648] [MyActorSystem-akka.actor.default-dispatcher-12] [akka://MyActorSystem/user/PoolInterfaceActor-0] Shutting down host connection pool to blah.s3.amazonaws.com:443
[DEBUG] [04/18/2016 20:07:36.650] [MyActorSystem-akka.actor.default-dispatcher-11] [akka://MyActorSystem/user/PoolInterfaceActor-0] Host connection pool to blah.s3.amazonaws.com:443 has completed orderly shutdown
Ralph-Zitz commented 8 years ago

Sorry about the messy paste

joearasin commented 8 years ago

Cleaned it up. FWIW, triple backticks on the line before and after copypasta help a bunch.

joearasin commented 8 years ago

Hmm. and clearly you have a system and materializer in implicit-range

Ralph-Zitz commented 8 years ago

Should have just put the whole file there to begin with, I have added the missing pieces now :)

JDK: java version "1.8.0_77" Java(TM) SE Runtime Environment (build 1.8.0_77-b03) Java HotSpot(TM) 64-Bit Server VM (build 25.77-b03, mixed mode)

Akka: 2.4.3

joearasin commented 8 years ago

This is going to sound a bit silly, but can you replace the uploader with a Sink.forEach(println) and determine that it runs? I'd absolutely believe that this thing is swallowing an error that it shouldn't be.

Ralph-Zitz commented 8 years ago

Hmm the types are giving me a little trouble

Ralph-Zitz commented 8 years ago

Epic fail on my part :( I managed to get some more debug information, I completely missed setting my region differently (I do not have an account in us-east)

joearasin commented 8 years ago

Is there something that wasn't triggering failure that should have been? On Mon, Apr 18, 2016 at 3:09 PM Ralph Zitz notifications@github.com wrote:

Epic fail on my part :( I managed to get some more debug information, I completely missed setting my region differently (I do not have an account in us-east)

— You are receiving this because you commented.

Reply to this email directly or view it on GitHub https://github.com/bluelabsio/s3-stream/issues/3#issuecomment-211534735

Ralph-Zitz commented 8 years ago

No idea why I am getting the appropriate errors all of a sudden, now the logs are showing:

Failure(java.lang.Exception: <?xml version="1.0" encoding="UTF-8"?>
<Error><Code>AuthorizationHeaderMalformed</Code><Message>The authorization header is malformed; the region 'us-east-1' is wrong; expecting 'eu-west-1'</Message><Region>eu-west-1</Region><RequestId>5DBCA314CFDFCB7C</RequestId><HostId>h/ZRYhxMS2Egb/bxRCXm9Pd/Ca0DEirDP09EptI2QjdlM0+9tYAdsRWdR89i6vGSYFy+AWMaMR4=</HostId></Error>)

Everything is working now, thanks for all your help! :)

joearasin commented 8 years ago

Awesome. Closing this, but some of the exploration here might be relevant in #4