akka / alpakka

Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/libraries/alpakka/current/
Other
1.26k stars 645 forks source link

how to set bucketKey dynamically and avoid redundant materializing #1109

Open evbo opened 6 years ago

evbo commented 6 years ago

Hi,

I have a web service (using akka-http) that receives messages and produces a file to then upload to an s3 bucket, with an s3Key that is determined at runtime uniquely for each message.

I see that the provided multipartUpload sink only accepts a ByteString as input. However, I'm wondering if it's possible to somehow also provide a dynamic bucketKey as input?

Here is my source and sink, where the queue in this example is being offered a string each time a request is received (like queue offer "some message")

val queue = Source
    .queue[String](bufferSize, OverflowStrategy.backpressure)
    .map{
      s =>
        // heavy processing occurs here - that needs to be throttled, etc..
        ByteString(s + "processing stage...")
    }
    .to(s3Client.multipartUpload(bucket, "key that I'd like to be determined in flow"))
    .run()

I have two issues:

  1. The upload never occurs until I perform: queue.complete()
  2. I'd like to avoid materializing the sink/source upon each request, by dynamically providing the bucketKey as input to the multipartUpload (which I can see isn't currently supported).

For my first problem, I think it has something to do with the fact that I'm not calling runWith(), but I'm still trying to wrap my head around akka streams. Is what I'm trying to accomplish clear and feasible? Or is this just not how the API is supposed to be used?

For my second problem, here is how I'd like pass the bucketKey. Does this seem reasonable? Or does it violate any principles of your API? Or is there a more idiomatic way that already exists that I'm ignorant to?:

val queue = Source
    .queue[String](bufferSize, OverflowStrategy.backpressure)
    .map{
      s =>
        // heavy processing occurs here - that needs to be throttled, etc..
        val bucketKey = "...some bucketKey that gets derived"
        (bucketKey, ByteString(s + "processing stage...")
    }
    .to(s3Client.multipartUpload(bucket))
    .run()
evbo commented 6 years ago

I think I realize that the issue with my first problem is that the multipartUpload appears to have a completionSink, which I imagine awaits completion before uploading?

For instance, this will never upload:

Source.repeat(ByteString(""))
    .runWith(s3Client.multipartUpload(bucket, "testFileUploadRepeat"))

Whereas this will after it completes:

Source(1 to 3)
    .map(ByteString(_))
    .runWith(s3Client.multipartUpload(bucket, "testFileUploadRange"))
evbo commented 6 years ago

So I got a workaround that feels kinda hacky, but basically I am doing all the "heavy processing" in one stream post-materialization. Then I tell the result to another actor that materializes a new multipartUpload for each request:

val processGraph: RunnableGraph[(SourceQueueWithComplete[String], Publisher[(String, String)])] = Source
    .queue[String](bufferSize, OverflowStrategy.backpressure)
    .map { data =>
        // heavy processing here
          (dynamicBucketKey, data)
    }
    .toMat(Sink.asPublisher(true))(Keep.both)

val (queue, publisher) = processGraph.run()
val readyForUpload = Source.fromPublisher(publisher)

source
    .map { case (bucketKey: String, data: String) =>
      Source.single(ByteString(data)).runWith(s3Client.multipartUpload(bucket, bucketKey))
    }
    .runWith(Sink.ignore)

If there's a cleaner way to do this, I'd love to hear feedback. Otherwise I'm tempted to close this issue since this appears to be how the API is intended to be used. Maybe I'll have more concerns after load testing, but in my naive example everything appears to work ok...

2m commented 6 years ago

Hi. Sorry for the late response.

Instead of going through Sink.asPublisher and Source.fromPublisher dance, you can use flatMapConcat which allows to map every element of a stream to a Srouce which is then ran and concatted to the original stream.

Also take a look at Lightbend discussion forums where questions about Akka Stream API usage would be seen by more eyes.

anoo-muthuswaami commented 6 years ago

+1 I'm facing a similar issue with passing in dynamic target bucket key to multipartupload.

Background: Source bucket has several files and every file has a list of payload to be sent to another REST API. Upon completing processing a file, I must upload 2 files .complete and .failed into the target bucket with bucketkey from the source bucket.

Below is the sink definition

val targetBucket = "xxxx"
 val sink: Sink[Response, Future[MultipartUploadResult]] = {
    Flow[Response]
      .map { resp =>
        val targetBucketKey = s"${resp.bucketKey}.complete" //how to pass this variable to multipartupload
        ByteString(resp.payload)
      }
      .toMat(s3Client.multipartUpload(targetBucket, "targetBucketKey"))(Keep.right)
}

I tried the solution posted by evbo. Having multipartupload within map overrides any previously written value. Can someone please shed thoughts?

2m commented 6 years ago

Hi @anoo-muthuswaami,

usually when some data in the stream is needed for Sink creation, Sink.lazyInit can be handy:

(below is pseudocode, and I have not tried to compile it)

...
  Flow[Response]
    .alsoTo(Sink.lazyInit { resp =>
      val targetBucketKey = s"${resp.bucketKey}.complete"
      val sink = s3Client.multipartUpload(targetBucket, "targetBucketKey")
      Future.successful(Flow[Response].map(ByteString(_.payload)).to(sink))
    })
    .alsoTo(Sink.lazyInit { resp =>
      val targetBucketKey = s"${resp.bucketKey}.failed"
      val sink = s3Client.multipartUpload(targetBucket, "targetBucketKey")
      Future.successful(Flow[Response].map(ByteString(_.payload)).to(sink))
    })    
  }
anoo-muthuswaami commented 6 years ago

@2m Thank you for the response. lazyInit seems to create the sink upon receiving 1st element. All the data ends up in 1 bucketKey. But I require the Sink to take the bucketKey for every element it process. To give more background, there is a source bucket with bucket contents

folder1/2018/07/10/file1.json
folder1/2018/07/10/file2.json

As I process the contents of the file, I need to persist the result into TargetBucket with same file path appended with complete/failure like

folder1/2018/07/10/file1.json.complete
folder1/2018/07/10/file1.json.failed
folder1/2018/07/10/file2.json.complete
folder1/2018/07/10/file2.json.failed

Is it possible to have dynamic sink? Please suggest.

paualarco commented 4 years ago

As far as I can see, the example that @2m provided would solve the problem that @anoo-muthuswaami had.

However, IMO it does not have much sense to have a sink that always needs to be nested into a Flow, single Source.single(byteString) or a Sink.lazyInit, as in this case we would only be able to use this sink once, since the s3 update request would not allow to override the s3object or append to it.

Therefore, the solution could be simpler and nicer if we would provide an api that would allow passing dynamically the path in which we want to update the object.

What I am thinking is something simple such as adding another multipart definition that does not accept bucket and key as parameters, but it would return a sink that receives as input a S3Object (see below its definition), instead of just receiving for ByteString to be passed.

case class S3Object(content: ByteString, s3Location: S3Location)

an-tex commented 4 years ago

case class S3Object(content: ByteString, s3Location: S3Location)

This works as long as the content is small, as really streaming it to the bucket wouldn't be supported?